diff options
Diffstat (limited to 'src/ipcpd')
| -rw-r--r-- | src/ipcpd/normal/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/ipcpd/normal/cdap_request.c | 157 | ||||
| -rw-r--r-- | src/ipcpd/normal/cdap_request.h | 68 | ||||
| -rw-r--r-- | src/ipcpd/normal/fmgr.c | 390 | ||||
| -rw-r--r-- | src/ipcpd/normal/ribmgr.c | 501 | 
5 files changed, 347 insertions, 770 deletions
| diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 67a7953b..5f85dd89 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -25,7 +25,6 @@ protobuf_generate_c(RO_SRCS RO_HDRS ro.proto)  set(SOURCE_FILES    # Add source files here    addr_auth.c -  cdap_request.c    crc32.c    dir.c    fmgr.c diff --git a/src/ipcpd/normal/cdap_request.c b/src/ipcpd/normal/cdap_request.c deleted file mode 100644 index 8409b508..00000000 --- a/src/ipcpd/normal/cdap_request.c +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Normal IPCP - RIB Manager - CDAP request - * - *    Sander Vrijders   <sander.vrijders@intec.ugent.be> - *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#include <ouroboros/config.h> -#include <ouroboros/time_utils.h> -#include <ouroboros/errno.h> - -#include "cdap_request.h" - -#include <stdlib.h> - -struct cdap_request * cdap_request_create(enum cdap_opcode code, -                                          char *           name, -                                          int              invoke_id, -                                          struct cdap *    instance) -{ -        struct cdap_request * creq = malloc(sizeof(*creq)); -        pthread_condattr_t cattr; - -        if (creq == NULL) -                return NULL; - -        creq->code = code; -        creq->name = name; -        creq->invoke_id = invoke_id; -        creq->instance = instance; -        creq->state = REQ_INIT; -        creq->result = -1; - -        pthread_condattr_init(&cattr); -#ifndef __APPLE__ -        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); -#endif -        pthread_cond_init(&creq->cond, &cattr); -        pthread_mutex_init(&creq->lock, NULL); - -        INIT_LIST_HEAD(&creq->next); - -        return creq; -} - -void cdap_request_destroy(struct cdap_request * creq) -{ -        if (creq == NULL) -                return; - -        pthread_mutex_lock(&creq->lock); - -        if (creq->state == REQ_DESTROY) { -                pthread_mutex_unlock(&creq->lock); -                return; -        } - -        if (creq->state == REQ_INIT) -                creq->state = REQ_DONE; - -        if (creq->state == REQ_PENDING) { -                creq->state = REQ_DESTROY; -                pthread_cond_broadcast(&creq->cond); -        } - -        while (creq->state != REQ_DONE) -                pthread_cond_wait(&creq->cond, &creq->lock); - -        pthread_mutex_unlock(&creq->lock); - -        pthread_cond_destroy(&creq->cond); -        pthread_mutex_destroy(&creq->lock); - -        if (creq->name != NULL) -                free(creq->name); - -        free(creq); -} - -int cdap_request_wait(struct cdap_request * creq) -{ -        struct timespec timeout = {(CDAP_REPLY_TIMEOUT / 1000), -                                   (CDAP_REPLY_TIMEOUT % 1000) * MILLION}; -        struct timespec abstime; -        int ret = -1; - -        if (creq == NULL) -                return -EINVAL; - -        clock_gettime(CLOCK_REALTIME, &abstime); -        ts_add(&abstime, &timeout, &abstime); - -        pthread_mutex_lock(&creq->lock); - -        if (creq->state != REQ_INIT) { -                pthread_mutex_unlock(&creq->lock); -                return -EINVAL; -        } - -        creq->state = REQ_PENDING; - -        while (creq->state == REQ_PENDING) { -                if ((ret = -pthread_cond_timedwait(&creq->cond, -                                                   &creq->lock, -                                                   &abstime)) == -ETIMEDOUT) { -                        break; -                } -        } - -        if (creq->state == REQ_DESTROY) -                ret = -1; - -        creq->state = REQ_DONE; -        pthread_cond_broadcast(&creq->cond); - -        pthread_mutex_unlock(&creq->lock); - -        return ret; -} - -void cdap_request_respond(struct cdap_request * creq, int response) -{ -        if (creq == NULL) -                return; - -        pthread_mutex_lock(&creq->lock); - -        if (creq->state != REQ_PENDING) { -                pthread_mutex_unlock(&creq->lock); -                return; -        } - -        creq->state = REQ_RESPONSE; -        creq->result = response; -        pthread_cond_broadcast(&creq->cond); - -        while (creq->state == REQ_RESPONSE) -                pthread_cond_wait(&creq->cond, &creq->lock); - -        pthread_mutex_unlock(&creq->lock); -} diff --git a/src/ipcpd/normal/cdap_request.h b/src/ipcpd/normal/cdap_request.h deleted file mode 100644 index 9cccfda5..00000000 --- a/src/ipcpd/normal/cdap_request.h +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Ouroboros - Copyright (C) 2016 - * - * Normal IPCP - RIB Manager - CDAP request - * - *    Sander Vrijders   <sander.vrijders@intec.ugent.be> - *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - */ - -#ifndef OUROBOROS_IPCPD_NORMAL_CDAP_REQUEST_H -#define OUROBOROS_IPCPD_NORMAL_CDAP_REQUEST_H - -#include <ouroboros/config.h> -#include <ouroboros/cdap.h> -#include <ouroboros/list.h> - -#include <pthread.h> - -enum creq_state { -        REQ_INIT = 0, -        REQ_PENDING, -        REQ_RESPONSE, -        REQ_DONE, -        REQ_DESTROY -}; - -struct cdap_request { -        struct list_head next; - -        enum cdap_opcode code; -        char *           name; -        int              invoke_id; -        struct cdap *    instance; - -        int              result; - -        enum creq_state  state; -        pthread_cond_t   cond; -        pthread_mutex_t  lock; -}; - -struct cdap_request * cdap_request_create(enum cdap_opcode code, -                                          char *           name, -                                          int              invoke_id, -                                          struct cdap *    instance); - -void                  cdap_request_destroy(struct cdap_request * creq); - -int                   cdap_request_wait(struct cdap_request * creq); - -void                  cdap_request_respond(struct cdap_request * creq, -                                           int                   response); - -#endif /* OUROBOROS_IPCPD_NORMAL_CDAP_REQUEST_H */ diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 8e416aa4..6684db7c 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -49,82 +49,31 @@ typedef FlowAllocMsg flow_alloc_msg_t;  #define FD_UPDATE_TIMEOUT 100000 /* nanoseconds */ -struct np1_flow { -        int           fd; -        cep_id_t      cep_id; -        enum qos_cube qos; -}; - -struct nm1_flow { -        int           fd; -        enum qos_cube qos; -}; -  struct { -        pthread_t          nm1_flow_acceptor; -        struct nm1_flow ** nm1_flows; +        flow_set_t *       nm1_set[QOS_CUBE_MAX]; +        fqueue_t *         nm1_fqs[QOS_CUBE_MAX];          pthread_rwlock_t   nm1_flows_lock; -        flow_set_t *       nm1_set; -        pthread_t          nm1_sdu_reader; -        struct np1_flow ** np1_flows; -        struct np1_flow ** np1_flows_cep; +        flow_set_t *       np1_set[QOS_CUBE_MAX]; +        fqueue_t *         np1_fqs[QOS_CUBE_MAX];          pthread_rwlock_t   np1_flows_lock; -        flow_set_t *       np1_set; + +        cep_id_t           np1_fd_to_cep_id[AP_MAX_FLOWS]; +        int                np1_cep_id_to_fd[IPCPD_MAX_CONNS]; + +        pthread_t          nm1_flow_acceptor; +        pthread_t          nm1_sdu_reader;          pthread_t          np1_sdu_reader;          /* FIXME: Replace with PFF */          int fd;  } fmgr; -static int add_nm1_fd(int fd, -                      enum qos_cube qos) -{ -        struct nm1_flow * tmp; - -        tmp = malloc(sizeof(*tmp)); -        if (tmp == NULL) -                return -1; - -        tmp->fd = fd; -        tmp->qos = qos; - -        pthread_rwlock_wrlock(&fmgr.nm1_flows_lock); -        fmgr.nm1_flows[fd] = tmp; -        pthread_rwlock_unlock(&fmgr.nm1_flows_lock); - -        flow_set_add(fmgr.nm1_set, fd); - -        /* FIXME: Temporary, until we have a PFF */ -        fmgr.fd = fd; - -        return 0; -} - -static int add_np1_fd(int           fd, -                      cep_id_t      cep_id, -                      enum qos_cube qos) -{ -        struct np1_flow * flow; - -        flow = malloc(sizeof(*flow)); -        if (flow == NULL) -                return -1; - -        flow->cep_id = cep_id; -        flow->qos = qos; -        flow->fd = fd; - -        fmgr.np1_flows[fd] = flow; -        fmgr.np1_flows_cep[cep_id] = flow; - -        return 0; -} -  static void * fmgr_nm1_acceptor(void * o)  {          int       fd;          char *    ae_name; +        qoscube_t cube;          qosspec_t qs;          (void) o; @@ -150,13 +99,13 @@ static void * fmgr_nm1_acceptor(void * o)                  if (!(strcmp(ae_name, MGMT_AE) == 0 ||                        strcmp(ae_name, DT_AE) == 0)) {                          if (flow_alloc_resp(fd, -1)) -                                LOG_ERR("Failed to reply to flow allocation."); +                                LOG_WARN("Failed to reply to flow allocation.");                          flow_dealloc(fd);                          continue;                  }                  if (flow_alloc_resp(fd, 0)) { -                        LOG_ERR("Failed to reply to flow allocation."); +                        LOG_WARN("Failed to reply to flow allocation.");                          flow_dealloc(fd);                          continue;                  } @@ -166,17 +115,19 @@ static void * fmgr_nm1_acceptor(void * o)                  if (strcmp(ae_name, MGMT_AE) == 0) {                          if (ribmgr_add_flow(fd)) { -                                LOG_ERR("Failed to hand fd to RIB."); +                                LOG_WARN("Failed to hand fd to RIB.");                                  flow_dealloc(fd);                                  continue;                          }                  } else { -                        /* FIXME: Pass correct QoS cube */ -                        if (add_nm1_fd(fd, QOS_CUBE_BE)) { -                                LOG_ERR("Failed to add fd to list."); +                        ipcp_flow_get_qoscube(fd, &cube); +                        if (flow_set_add(fmgr.nm1_set[cube], fd)) { +                                LOG_WARN("Failed to add fd.");                                  flow_dealloc(fd);                                  continue;                          } +                        /* FIXME: Temporary, until we have a PFF */ +                        fmgr.fd = fd;                  }                  free(ae_name); @@ -189,44 +140,40 @@ static void * fmgr_np1_sdu_reader(void * o)  {          struct shm_du_buff * sdb;          struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; -        struct np1_flow * flow;          int fd; -        fqueue_t * fq = fqueue_create(); -        if (fq == NULL) -                return (void *) 1; + +        int i = 0; +        int ret;          (void) o;          while (true) { -                int ret = flow_event_wait(fmgr.np1_set, fq, &timeout); +                /* FIXME: replace with scheduling policy call */ +                i = (i + 1) % QOS_CUBE_MAX; + +                ret = flow_event_wait(fmgr.np1_set[i], +                                      fmgr.np1_fqs[i], +                                      &timeout);                  if (ret == -ETIMEDOUT)                          continue;                  if (ret < 0) { -                        LOG_ERR("Event error: %d.", ret); +                        LOG_WARN("Event error: %d.", ret);                          continue;                  } -                while ((fd = fqueue_next(fq)) >= 0) { +                while ((fd = fqueue_next(fmgr.np1_fqs[i])) >= 0) {                          if (ipcp_flow_read(fd, &sdb)) { -                                LOG_ERR("Failed to read SDU from fd %d.", fd); +                                LOG_WARN("Failed to read SDU from fd %d.", fd);                                  continue;                          }                          pthread_rwlock_rdlock(&fmgr.np1_flows_lock); -                        flow = fmgr.np1_flows[fd]; -                        if (flow == NULL) { -                                pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                                ipcp_flow_del(sdb); -                                LOG_ERR("Failed to retrieve flow."); -                                continue; -                        } - -                        if (frct_i_write_sdu(flow->cep_id, sdb)) { +                        if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) {                                  pthread_rwlock_unlock(&fmgr.np1_flows_lock);                                  ipcp_flow_del(sdb); -                                LOG_ERR("Failed to hand SDU to FRCT."); +                                LOG_WARN("Failed to hand SDU to FRCT.");                                  continue;                          } @@ -244,14 +191,18 @@ void * fmgr_nm1_sdu_reader(void * o)          struct shm_du_buff * sdb;          struct pci * pci;          int fd; -        fqueue_t * fq = fqueue_create(); -        if (fq == NULL) -                return (void *) 1; +        int i = 0; +        int ret;          (void) o;          while (true) { -                int ret = flow_event_wait(fmgr.nm1_set, fq, &timeout); +                /* FIXME: replace with scheduling policy call */ +                i = (i + 1) % QOS_CUBE_MAX; + +                ret = flow_event_wait(fmgr.nm1_set[i], +                                      fmgr.nm1_fqs[i], +                                      &timeout);                  if (ret == -ETIMEDOUT)                          continue; @@ -260,7 +211,7 @@ void * fmgr_nm1_sdu_reader(void * o)                          continue;                  } -                while ((fd = fqueue_next(fq)) >= 0) { +                while ((fd = fqueue_next(fmgr.nm1_fqs[i])) >= 0) {                          if (ipcp_flow_read(fd, &sdb)) {                                  LOG_ERR("Failed to read SDU from fd %d.", fd);                                  continue; @@ -320,52 +271,55 @@ void * fmgr_nm1_sdu_reader(void * o)          return (void *) 0;  } -int fmgr_init() +static void fmgr_destroy_flows(void)  {          int i; -        fmgr.nm1_flows = malloc(sizeof(*(fmgr.nm1_flows)) * IRMD_MAX_FLOWS); -        if (fmgr.nm1_flows == NULL) -                return -1; - -        fmgr.np1_flows = malloc(sizeof(*(fmgr.np1_flows)) * IRMD_MAX_FLOWS); -        if (fmgr.np1_flows == NULL) { -                free(fmgr.nm1_flows); -                return -1; +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                flow_set_destroy(fmgr.nm1_set[i]); +                flow_set_destroy(fmgr.np1_set[i]); +                fqueue_destroy(fmgr.nm1_fqs[i]); +                fqueue_destroy(fmgr.np1_fqs[i]);          } +} -        fmgr.np1_flows_cep = -                malloc(sizeof(*(fmgr.np1_flows_cep)) * IRMD_MAX_FLOWS); -        if (fmgr.np1_flows_cep == NULL) { -                free(fmgr.np1_flows); -                free(fmgr.nm1_flows); -                return -1; -        } +int fmgr_init() +{ +        int i; -        for (i = 0; i < IRMD_MAX_FLOWS; i++) { -                fmgr.nm1_flows[i] = NULL; -                fmgr.np1_flows[i] = NULL; -                fmgr.np1_flows_cep[i] = NULL; -        } +        for (i = 0; i < AP_MAX_FLOWS; ++i) +                fmgr.np1_fd_to_cep_id[i] = INVALID_CEP_ID; + +        for (i = 0; i < IPCPD_MAX_CONNS; ++i) +                fmgr.np1_cep_id_to_fd[i] = -1;          pthread_rwlock_init(&fmgr.nm1_flows_lock, NULL);          pthread_rwlock_init(&fmgr.np1_flows_lock, NULL); -        fmgr.np1_set = flow_set_create(); -        if (fmgr.np1_set == NULL) { -                free(fmgr.np1_flows_cep); -                free(fmgr.np1_flows); -                free(fmgr.nm1_flows); -                return -1; -        } +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                fmgr.np1_set[i] = flow_set_create(); +                if (fmgr.np1_set == NULL) { +                        fmgr_destroy_flows(); +                        return -1; +                } -        fmgr.nm1_set = flow_set_create(); -        if (fmgr.nm1_set == NULL) { -                flow_set_destroy(fmgr.np1_set); -                free(fmgr.np1_flows_cep); -                free(fmgr.np1_flows); -                free(fmgr.nm1_flows); -                return -1; +                fmgr.np1_fqs[i] = fqueue_create(); +                if (fmgr.np1_fqs[i] == NULL) { +                        fmgr_destroy_flows(); +                        return -1; +                } + +                fmgr.nm1_set[i] = flow_set_create(); +                if (fmgr.nm1_set == NULL) { +                        fmgr_destroy_flows(); +                        return -1; +                } + +                fmgr.nm1_fqs[i] = fqueue_create(); +                if (fmgr.nm1_fqs[i] == NULL) { +                        fmgr_destroy_flows(); +                        return -1; +                }          }          pthread_create(&fmgr.nm1_flow_acceptor, NULL, fmgr_nm1_acceptor, NULL); @@ -378,6 +332,7 @@ int fmgr_init()  int fmgr_fini()  {          int i; +        int j;          pthread_cancel(fmgr.nm1_flow_acceptor);          pthread_cancel(fmgr.np1_sdu_reader); @@ -387,29 +342,25 @@ int fmgr_fini()          pthread_join(fmgr.np1_sdu_reader, NULL);          pthread_join(fmgr.nm1_sdu_reader, NULL); -        for (i = 0; i < IRMD_MAX_FLOWS; i++) { -                if (fmgr.nm1_flows[i] == NULL) -                        continue; -                flow_dealloc(fmgr.nm1_flows[i]->fd); -                free(fmgr.nm1_flows[i]); -        } +        for (i = 0; i < AP_MAX_FLOWS; ++i) +                for (j = 0; j < QOS_CUBE_MAX; ++j) +                        if (flow_set_has(fmgr.nm1_set[j], i)) { +                                flow_dealloc(i); +                                flow_set_del(fmgr.nm1_set[j], i); +                        }          pthread_rwlock_destroy(&fmgr.nm1_flows_lock);          pthread_rwlock_destroy(&fmgr.np1_flows_lock); -        flow_set_destroy(fmgr.nm1_set); -        flow_set_destroy(fmgr.np1_set); -        free(fmgr.np1_flows_cep); -        free(fmgr.np1_flows); -        free(fmgr.nm1_flows); +        fmgr_destroy_flows();          return 0;  } -int fmgr_np1_alloc(int           fd, -                   char *        dst_ap_name, -                   char *        src_ae_name, -                   enum qos_cube qos) +int fmgr_np1_alloc(int       fd, +                   char *    dst_ap_name, +                   char *    src_ae_name, +                   qoscube_t qos)  {          cep_id_t cep_id;          buffer_t buf; @@ -478,10 +429,8 @@ int fmgr_np1_alloc(int           fd,          free(ro_data); -        if (add_np1_fd(fd, cep_id, qos)) { -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                return -1; -        } +        fmgr.np1_fd_to_cep_id[fd] = cep_id; +        fmgr.np1_cep_id_to_fd[cep_id] = fd;          pthread_rwlock_unlock(&fmgr.np1_flows_lock); @@ -491,16 +440,13 @@ int fmgr_np1_alloc(int           fd,  /* Call under np1_flows lock */  static int np1_flow_dealloc(int fd)  { -        struct np1_flow * flow;          flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;          buffer_t buf;          int ret; +        qoscube_t cube; -        flow_set_del(fmgr.np1_set, fd); - -        flow = fmgr.np1_flows[fd]; -        if (flow == NULL) -                return -1; +        ipcp_flow_get_qoscube(fd, &cube); +        flow_set_del(fmgr.np1_set[cube], fd);          msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC; @@ -510,16 +456,15 @@ static int np1_flow_dealloc(int fd)          buf.data = malloc(buf.len);          if (buf.data == NULL) -                return -1; +                return -ENOMEM;          flow_alloc_msg__pack(&msg, buf.data); -        ret = frct_i_destroy(flow->cep_id, &buf); +        ret = frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf); -        fmgr.np1_flows[fd] = NULL; -        fmgr.np1_flows_cep[flow->cep_id] = NULL; +        fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] = INVALID_CEP_ID; +        fmgr.np1_fd_to_cep_id[fd] = -1; -        free(flow);          free(buf.data);          return ret; @@ -527,48 +472,39 @@ static int np1_flow_dealloc(int fd)  int fmgr_np1_alloc_resp(int fd, int response)  { -        struct np1_flow * flow;          flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;          buffer_t buf; -        pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - -        flow = fmgr.np1_flows[fd]; -        if (flow == NULL) { -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                return -1; -        } -          msg.code = FLOW_ALLOC_CODE__FLOW_REPLY;          msg.response = response;          msg.has_response = true;          buf.len = flow_alloc_msg__get_packed_size(&msg); -        if (buf.len == 0) { -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); +        if (buf.len == 0)                  return -1; -        }          buf.data = malloc(buf.len); -        if (buf.data == NULL) { -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                return -1; -        } +        if (buf.data == NULL) +                return -ENOMEM;          flow_alloc_msg__pack(&msg, buf.data); +        pthread_rwlock_wrlock(&fmgr.np1_flows_lock); +          if (response < 0) { -                frct_i_destroy(flow->cep_id, &buf); +                frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf);                  free(buf.data); -                fmgr.np1_flows[fd] = NULL; -                fmgr.np1_flows_cep[flow->cep_id] = NULL; -                free(flow); +                fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] +                        = INVALID_CEP_ID; +                fmgr.np1_fd_to_cep_id[fd] = -1;          } else { -                if (frct_i_accept(flow->cep_id, &buf, flow->qos)) { +                qoscube_t cube; +                ipcp_flow_get_qoscube(fd, &cube); +                if (frct_i_accept(fmgr.np1_fd_to_cep_id[fd], &buf, cube)) {                          pthread_rwlock_unlock(&fmgr.np1_flows_lock);                          return -1;                  } -                flow_set_add(fmgr.np1_set, fd); +                flow_set_add(fmgr.np1_set[cube], fd);          }          pthread_rwlock_unlock(&fmgr.np1_flows_lock); @@ -581,27 +517,25 @@ int fmgr_np1_dealloc(int fd)          int ret;          pthread_rwlock_wrlock(&fmgr.np1_flows_lock); +          ret = np1_flow_dealloc(fd); +          pthread_rwlock_unlock(&fmgr.np1_flows_lock);          return ret;  } -int fmgr_np1_post_buf(cep_id_t   cep_id, -                      buffer_t * buf) +int fmgr_np1_post_buf(cep_id_t cep_id, buffer_t * buf)  { -        struct np1_flow * flow;          int ret = 0;          int fd;          flow_alloc_msg_t * msg; - -        pthread_rwlock_wrlock(&fmgr.np1_flows_lock); +        qoscube_t cube;          /* Depending on the message call the function in ipcp-dev.h */          msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data);          if (msg == NULL) { -                pthread_rwlock_unlock(&fmgr.np1_flows_lock);                  LOG_ERR("Failed to unpack flow alloc message");                  return -1;          } @@ -612,51 +546,41 @@ int fmgr_np1_post_buf(cep_id_t   cep_id,                                         msg->dst_name,                                         msg->src_ae_name);                  if (fd < 0) { -                        pthread_rwlock_unlock(&fmgr.np1_flows_lock);                          flow_alloc_msg__free_unpacked(msg, NULL);                          LOG_ERR("Failed to get fd for flow.");                          return -1;                  } -                if (add_np1_fd(fd, cep_id, msg->qos_cube)) { -                        pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                        flow_alloc_msg__free_unpacked(msg, NULL); -                        LOG_ERR("Failed to add np1 flow."); -                        return -1; -                } +                pthread_rwlock_wrlock(&fmgr.np1_flows_lock); + +                fmgr.np1_fd_to_cep_id[fd] = cep_id; +                fmgr.np1_cep_id_to_fd[cep_id] = fd; + +                pthread_rwlock_unlock(&fmgr.np1_flows_lock);                  break;          case FLOW_ALLOC_CODE__FLOW_REPLY: -                flow = fmgr.np1_flows_cep[cep_id]; -                if (flow == NULL) { -                        pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                        flow_alloc_msg__free_unpacked(msg, NULL); -                        LOG_ERR("No such flow in flow manager."); -                        return -1; -                } +                pthread_rwlock_wrlock(&fmgr.np1_flows_lock); -                ret = ipcp_flow_alloc_reply(flow->fd, msg->response); +                fd = fmgr.np1_cep_id_to_fd[cep_id]; +                ret = ipcp_flow_alloc_reply(fd, msg->response);                  if (msg->response < 0) { -                        fmgr.np1_flows[flow->fd] = NULL; -                        fmgr.np1_flows_cep[cep_id] = NULL; -                        free(flow); +                        fmgr.np1_fd_to_cep_id[fd] = INVALID_CEP_ID; +                        fmgr.np1_cep_id_to_fd[cep_id] = -1;                  } else { -                        flow_set_add(fmgr.np1_set, flow->fd); +                        ipcp_flow_get_qoscube(fd, &cube); +                        flow_set_add(fmgr.np1_set[cube], +                                     fmgr.np1_cep_id_to_fd[cep_id]);                  } +                pthread_rwlock_unlock(&fmgr.np1_flows_lock); +                  break;          case FLOW_ALLOC_CODE__FLOW_DEALLOC: -                flow = fmgr.np1_flows_cep[cep_id]; -                if (flow == NULL) { -                        pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                        flow_alloc_msg__free_unpacked(msg, NULL); -                        LOG_ERR("No such flow in flow manager."); -                        return -1; -                } - -                flow_set_del(fmgr.np1_set, flow->fd); - -                ret = flow_dealloc(flow->fd); +                fd = fmgr.np1_cep_id_to_fd[cep_id]; +                ipcp_flow_get_qoscube(fd, &cube); +                flow_set_del(fmgr.np1_set[cube], fd); +                ret = flow_dealloc(fd);                  break;          default:                  LOG_ERR("Got an unknown flow allocation message."); @@ -674,18 +598,12 @@ int fmgr_np1_post_buf(cep_id_t   cep_id,  int fmgr_np1_post_sdu(cep_id_t             cep_id,                        struct shm_du_buff * sdb)  { -        struct np1_flow * flow; +        int fd;          pthread_rwlock_rdlock(&fmgr.np1_flows_lock); -        flow = fmgr.np1_flows_cep[cep_id]; -        if (flow == NULL) { -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                LOG_ERR("Failed to find N flow."); -                return -1; -        } - -        if (ipcp_flow_write(flow->fd, sdb)) { +        fd = fmgr.np1_cep_id_to_fd[cep_id]; +        if (ipcp_flow_write(fd, sdb)) {                  pthread_rwlock_unlock(&fmgr.np1_flows_lock);                  LOG_ERR("Failed to hand SDU to N flow.");                  return -1; @@ -704,19 +622,19 @@ int fmgr_nm1_mgmt_flow(char * dst_name)          /* FIXME: Request retransmission. */          fd = flow_alloc(dst_name, MGMT_AE, NULL);          if (fd < 0) { -                LOG_ERR("Failed to allocate flow to %s", dst_name); +                LOG_ERR("Failed to allocate flow to %s.", dst_name);                  return -1;          }          result = flow_alloc_res(fd);          if (result < 0) { -                LOG_ERR("Result of flow allocation to %s is %d", +                LOG_ERR("Result of flow allocation to %s is %d.",                          dst_name, result);                  return -1;          }          if (ribmgr_add_flow(fd)) { -                LOG_ERR("Failed to hand file descriptor to RIB manager"); +                LOG_ERR("Failed to hand file descriptor to RIB manager.");                  flow_dealloc(fd);                  return -1;          } @@ -724,8 +642,7 @@ int fmgr_nm1_mgmt_flow(char * dst_name)          return 0;  } -int fmgr_nm1_dt_flow(char * dst_name, -                     enum qos_cube qos) +int fmgr_nm1_dt_flow(char * dst_name, qoscube_t qos)  {          int fd;          int result; @@ -733,28 +650,25 @@ int fmgr_nm1_dt_flow(char * dst_name,          /* FIXME: Map qos cube on correct QoS. */          fd = flow_alloc(dst_name, DT_AE, NULL);          if (fd < 0) { -                LOG_ERR("Failed to allocate flow to %s", dst_name); +                LOG_ERR("Failed to allocate flow to %s.", dst_name);                  return -1;          }          result = flow_alloc_res(fd);          if (result < 0) { -                LOG_ERR("Result of flow allocation to %s is %d", -                        dst_name, result); +                LOG_ERR("Allocate flow to %s result %d.", dst_name, result);                  return -1;          } -        if (add_nm1_fd(fd, qos)) { -                LOG_ERR("Failed to add file descriptor to list."); -                flow_dealloc(fd); -                return -1; -        } +        flow_set_add(fmgr.nm1_set[qos], fd); + +        /* FIXME: Temporary, until we have a PFF */ +        fmgr.fd = fd;          return 0;  } -int fmgr_nm1_write_sdu(struct pci *         pci, -                       struct shm_du_buff * sdb) +int fmgr_nm1_write_sdu(struct pci * pci, struct shm_du_buff * sdb)  {          if (pci == NULL || sdb == NULL)                  return -1; diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index b0738a0c..1e9bcc18 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -42,7 +42,6 @@  #include "dt_const.h"  #include "frct.h"  #include "ipcp.h" -#include "cdap_request.h"  #include "ro.h"  #include "path.h"  #include "dir.h" @@ -85,22 +84,28 @@ struct rnode {  };  struct mgmt_flow { +        struct list_head next; +          struct cdap *    instance;          int              fd; -        struct list_head next; + +        pthread_t        handler;  };  struct ro_sub { +        struct list_head    next; +          int                 sid; +          char *              name;          struct ro_sub_ops * ops; -        struct list_head    next;  };  struct ro_id { +        struct list_head next; +          uint64_t         seqno;          char *           full_name; -        struct list_head next;  };  struct { @@ -124,9 +129,6 @@ struct {          struct list_head    flows;          pthread_rwlock_t    flows_lock; -        struct list_head    cdap_reqs; -        pthread_mutex_t     cdap_reqs_lock; -          struct addr_auth *  addr_auth;          enum pol_addr_auth  addr_auth_type;  } rib; @@ -173,7 +175,7 @@ void ribmgr_ro_created(const char * name,          pthread_rwlock_unlock(&ipcpi.state_lock);  } -/* We only have a create operation for now */ +/* We only have a create operation for now. */  static struct ro_sub_ops ribmgr_sub_ops = {          .ro_created = ribmgr_ro_created,          .ro_updated = NULL, @@ -303,9 +305,12 @@ static void ro_delete_timer(void * o)  {          char * name = (char *) o; -        if (ribmgr_ro_delete(name)) { +        pthread_mutex_lock(&rib.ro_lock); + +        if (ribmgr_ro_delete(name))                  LOG_ERR("Failed to delete %s.", name); -        } + +        pthread_mutex_unlock(&rib.ro_lock);  }  static struct rnode * ribmgr_ro_create(const char *   name, @@ -342,7 +347,7 @@ static struct rnode * ribmgr_ro_create(const char *   name,                  node = node->child;                  sibling = false; -                /* Search horizontally */ +                /* Search horizontally. */                  while (node != NULL) {                          if (strcmp(node->name, token) == 0) {                                  break; @@ -400,15 +405,12 @@ static struct rnode * ribmgr_ro_create(const char *   name,          LOG_DBG("Created RO with name %s.", name); -        if (!(attr.expiry.tv_sec == 0 && -              attr.expiry.tv_nsec == 0)) { +        if (!(attr.expiry.tv_sec == 0 && attr.expiry.tv_nsec == 0)) {                  timeout = attr.expiry.tv_sec * 1000 +                          attr.expiry.tv_nsec / MILLION; -                if (timerwheel_add(rib.wheel, ro_delete_timer, -                                   new->full_name, strlen(new->full_name) + 1, -                                   timeout)) { +                if (timerwheel_add(rib.wheel, ro_delete_timer, new->full_name, +                                   strlen(new->full_name) + 1, timeout))                          LOG_ERR("Failed to add deletion timer of RO."); -                }          }          return new; @@ -434,51 +436,6 @@ static struct rnode * ribmgr_ro_write(const char * name,          return node;  } -/* Call while holding cdap_reqs_lock */ -/* FIXME: better not to call blocking functions under any lock */ -int cdap_result_wait(struct cdap * instance, -                     enum cdap_opcode code, -                     char * name, -                     int invoke_id) -{ -        struct cdap_request * req; -        int ret; -        char * name_dup = strdup(name); -        if (name_dup == NULL) -                return -1; - -        req = cdap_request_create(code, name_dup, invoke_id, instance); -        if (req == NULL) { -                free(name_dup); -                return -1; -        } - -        list_add(&req->next, &rib.cdap_reqs); - -        pthread_mutex_unlock(&rib.cdap_reqs_lock); - -        ret = cdap_request_wait(req); - -        pthread_mutex_lock(&rib.cdap_reqs_lock); - -        if (ret == -1)  /* should only be on ipcp shutdown */ -                LOG_DBG("Waiting CDAP request destroyed."); - -        if (ret == -ETIMEDOUT) -                LOG_ERR("CDAP Request timed out."); - -        if (ret) -                LOG_DBG("Unknown error code: %d.", ret); - -        if (!ret) -                ret = req->result; - -        list_del(&req->next); -        cdap_request_destroy(req); - -        return ret; -} -  static int write_ro_msg(struct cdap *    neighbor,                          ro_msg_t *       msg,                          char *           name, @@ -486,7 +443,8 @@ static int write_ro_msg(struct cdap *    neighbor,  {          uint8_t * data;          size_t len; -        int iid = 0; +        cdap_key_t key; +        int ret;          len = ro_msg__get_packed_size(msg);          if (len == 0) @@ -498,23 +456,21 @@ static int write_ro_msg(struct cdap *    neighbor,          ro_msg__pack(msg, data); -        pthread_mutex_lock(&rib.cdap_reqs_lock); -        iid = cdap_send_request(neighbor, code, -                                name, data, len, 0); -        if (iid < 0) { -                pthread_mutex_unlock(&rib.cdap_reqs_lock); +        key = cdap_request_send(neighbor, code, name, data, len, 0); +        if (key < 0) { +                LOG_ERR("Failed to send CDAP request.");                  free(data);                  return -1;          } -        if (cdap_result_wait(neighbor, code, name, iid)) { -                pthread_mutex_unlock(&rib.cdap_reqs_lock); -                free(data); -                LOG_ERR("Remote did not receive RIB object."); +        free(data); + +        ret = cdap_reply_wait(neighbor, key, NULL, NULL); +        if (ret < 0) { +                LOG_ERR("CDAP command with code %d and name %s failed:  %d.", +                        code, name, ret);                  return -1;          } -        pthread_mutex_unlock(&rib.cdap_reqs_lock); -        free(data);          return 0;  } @@ -522,7 +478,6 @@ static int write_ro_msg(struct cdap *    neighbor,  int ribmgr_init()  {          INIT_LIST_HEAD(&rib.flows); -        INIT_LIST_HEAD(&rib.cdap_reqs);          INIT_LIST_HEAD(&rib.subs);          INIT_LIST_HEAD(&rib.ro_ids); @@ -540,17 +495,9 @@ int ribmgr_init()                  return -1;          } -        if (pthread_mutex_init(&rib.cdap_reqs_lock, NULL)) { -                LOG_ERR("Failed to initialize mutex."); -                pthread_rwlock_destroy(&rib.flows_lock); -                free(rib.root); -                return -1; -        } -          if (pthread_mutex_init(&rib.ro_lock, NULL)) {                  LOG_ERR("Failed to initialize mutex.");                  pthread_rwlock_destroy(&rib.flows_lock); -                pthread_mutex_destroy(&rib.cdap_reqs_lock);                  free(rib.root);                  return -1;          } @@ -558,7 +505,6 @@ int ribmgr_init()          if (pthread_mutex_init(&rib.subs_lock, NULL)) {                  LOG_ERR("Failed to initialize mutex.");                  pthread_rwlock_destroy(&rib.flows_lock); -                pthread_mutex_destroy(&rib.cdap_reqs_lock);                  pthread_mutex_destroy(&rib.ro_lock);                  free(rib.root);                  return -1; @@ -567,7 +513,6 @@ int ribmgr_init()          if (pthread_mutex_init(&rib.ro_ids_lock, NULL)) {                  LOG_ERR("Failed to initialize mutex.");                  pthread_rwlock_destroy(&rib.flows_lock); -                pthread_mutex_destroy(&rib.cdap_reqs_lock);                  pthread_mutex_destroy(&rib.ro_lock);                  pthread_mutex_destroy(&rib.subs_lock);                  free(rib.root); @@ -578,7 +523,6 @@ int ribmgr_init()          if (rib.sids == NULL) {                  LOG_ERR("Failed to create bitmap.");                  pthread_rwlock_destroy(&rib.flows_lock); -                pthread_mutex_destroy(&rib.cdap_reqs_lock);                  pthread_mutex_destroy(&rib.ro_lock);                  pthread_mutex_destroy(&rib.subs_lock);                  pthread_mutex_destroy(&rib.ro_ids_lock); @@ -591,7 +535,6 @@ int ribmgr_init()                  LOG_ERR("Failed to create timerwheel.");                  bmp_destroy(rib.sids);                  pthread_rwlock_destroy(&rib.flows_lock); -                pthread_mutex_destroy(&rib.cdap_reqs_lock);                  pthread_mutex_destroy(&rib.ro_lock);                  pthread_mutex_destroy(&rib.subs_lock);                  pthread_mutex_destroy(&rib.ro_ids_lock); @@ -605,7 +548,6 @@ int ribmgr_init()                  timerwheel_destroy(rib.wheel);                  bmp_destroy(rib.sids);                  pthread_rwlock_destroy(&rib.flows_lock); -                pthread_mutex_destroy(&rib.cdap_reqs_lock);                  pthread_mutex_destroy(&rib.ro_lock);                  pthread_mutex_destroy(&rib.subs_lock);                  pthread_mutex_destroy(&rib.ro_ids_lock); @@ -633,16 +575,6 @@ int ribmgr_fini()          struct list_head * pos = NULL;          struct list_head * n = NULL; -        pthread_mutex_lock(&rib.cdap_reqs_lock); -        list_for_each_safe(pos, n, &rib.cdap_reqs) { -                struct cdap_request * req = -                        list_entry(pos, struct cdap_request, next); -                free(req->name); -                list_del(&req->next); -                free(req); -        } -        pthread_mutex_unlock(&rib.cdap_reqs_lock); -          pthread_rwlock_wrlock(&rib.flows_lock);          list_for_each_safe(pos, n, &rib.flows) {                  struct mgmt_flow * flow = @@ -668,7 +600,6 @@ int ribmgr_fini()          timerwheel_destroy(rib.wheel);          pthread_mutex_destroy(&rib.subs_lock); -        pthread_mutex_destroy(&rib.cdap_reqs_lock);          pthread_mutex_destroy(&rib.ro_lock);          pthread_rwlock_destroy(&rib.flows_lock);          pthread_mutex_destroy(&rib.ro_ids_lock); @@ -676,49 +607,8 @@ int ribmgr_fini()          return 0;  } -static int ribmgr_cdap_reply(struct cdap * instance, -                             int           invoke_id, -                             int           result, -                             uint8_t *     data, -                             size_t        len) -{ -        struct list_head * pos, * n = NULL; - -        /* We never perform reads on other RIBs */ -        (void) data; -        (void) len; - -        pthread_mutex_lock(&rib.cdap_reqs_lock); - -        list_for_each_safe(pos, n, &rib.cdap_reqs) { -                struct cdap_request * req = -                        list_entry(pos, struct cdap_request, next); -                if (req->instance == instance && -                    req->invoke_id == invoke_id && -                    req->state == REQ_PENDING) { -                        if (result != 0) -                                LOG_ERR("CDAP command with code %d and name %s " -                                        "failed with error %d", -                                        req->code, req->name, result); -                        else -                                LOG_DBG("CDAP command with code %d and name %s " -                                        "executed succesfully", -                                        req->code, req->name); - -                        pthread_mutex_unlock(&rib.cdap_reqs_lock); - -                        cdap_request_respond(req, result); - -                        pthread_mutex_lock(&rib.cdap_reqs_lock); -                } -        } -        pthread_mutex_unlock(&rib.cdap_reqs_lock); - -        return 0; -} -  static int ribmgr_cdap_create(struct cdap * instance, -                              int           invoke_id, +                              cdap_key_t    key,                                char *        name,                                ro_msg_t *    msg)  { @@ -729,6 +619,8 @@ static int ribmgr_cdap_create(struct cdap * instance,          struct ro_attr attr;          struct rnode * node; +        assert(instance); +          ro_attr_init(&attr);          attr.expiry.tv_sec = msg->sec;          attr.expiry.tv_nsec = msg->nsec; @@ -740,7 +632,7 @@ static int ribmgr_cdap_create(struct cdap * instance,          ro_data = malloc(msg->value.len);          if (ro_data == NULL) {                  pthread_mutex_unlock(&rib.ro_lock); -                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                cdap_reply_send(instance, key, -1, NULL, 0);                  return -1;          }          memcpy(ro_data, msg->value.data, msg->value.len); @@ -748,7 +640,7 @@ static int ribmgr_cdap_create(struct cdap * instance,          node = ribmgr_ro_create(name, attr, ro_data, msg->value.len);          if (node == NULL) {                  pthread_mutex_unlock(&rib.ro_lock); -                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                cdap_reply_send(instance, key, -1, NULL, 0);                  free(ro_data);                  return -1;          } @@ -778,7 +670,7 @@ static int ribmgr_cdap_create(struct cdap * instance,          pthread_mutex_unlock(&rib.subs_lock);          pthread_mutex_unlock(&rib.ro_lock); -        if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { +        if (cdap_reply_send(instance, key, ret, NULL, 0)) {                  LOG_ERR("Failed to send reply to create request.");                  return -1;          } @@ -787,7 +679,7 @@ static int ribmgr_cdap_create(struct cdap * instance,  }  static int ribmgr_cdap_delete(struct cdap * instance, -                              int           invoke_id, +                              cdap_key_t    key,                                char *        name)  {          struct list_head * p = NULL; @@ -798,7 +690,7 @@ static int ribmgr_cdap_delete(struct cdap * instance,          if (ribmgr_ro_delete(name)) {                  pthread_mutex_unlock(&rib.ro_lock); -                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                cdap_reply_send(instance, key, -1, NULL, 0);                  return -1;          } @@ -823,7 +715,7 @@ static int ribmgr_cdap_delete(struct cdap * instance,          pthread_mutex_unlock(&rib.subs_lock);          pthread_mutex_unlock(&rib.ro_lock); -        if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) { +        if (cdap_reply_send(instance, key, 0, NULL, 0)) {                  LOG_ERR("Failed to send reply to create request.");                  return -1;          } @@ -832,7 +724,7 @@ static int ribmgr_cdap_delete(struct cdap * instance,  }  static int ribmgr_cdap_write(struct cdap * instance, -                             int           invoke_id, +                             cdap_key_t    key,                               char *        name,                               ro_msg_t *    msg,                               uint32_t      flags) @@ -851,7 +743,7 @@ static int ribmgr_cdap_write(struct cdap * instance,          ro_data = malloc(msg->value.len);          if (ro_data == NULL) {                  pthread_mutex_unlock(&rib.ro_lock); -                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                cdap_reply_send(instance, key, -1, NULL, 0);                  return -1;          }          memcpy(ro_data, msg->value.data, msg->value.len); @@ -860,7 +752,7 @@ static int ribmgr_cdap_write(struct cdap * instance,          if (node == NULL) {                  pthread_mutex_unlock(&rib.ro_lock);                  free(ro_data); -                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                cdap_reply_send(instance, key, -1, NULL, 0);                  return -1;          }          node->seqno = msg->seqno; @@ -891,7 +783,7 @@ static int ribmgr_cdap_write(struct cdap * instance,          pthread_mutex_unlock(&rib.subs_lock);          pthread_mutex_unlock(&rib.ro_lock); -        if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { +        if (cdap_reply_send(instance, key, ret, NULL, 0)) {                  LOG_ERR("Failed to send reply to write request.");                  return -1;          } @@ -899,8 +791,7 @@ static int ribmgr_cdap_write(struct cdap * instance,          return 0;  } -static int ribmgr_enrol_sync(struct cdap * instance, -                             struct rnode * node) +static int ribmgr_enrol_sync(struct cdap * instance, struct rnode * node)  {          int ret = 0; @@ -931,24 +822,28 @@ static int ribmgr_enrol_sync(struct cdap * instance,  }  static int ribmgr_cdap_start(struct cdap * instance, -                             int           invoke_id, +                             cdap_key_t    key,                               char *        name)  { -        int iid = 0; - -        pthread_rwlock_wrlock(&ipcpi.state_lock); -        if (ipcp_get_state() == IPCP_OPERATIONAL && -            strcmp(name, ENROLLMENT) == 0) { +        if (strcmp(name, ENROLLMENT) == 0) {                  LOG_DBG("New enrollment request."); -                if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) { +                pthread_rwlock_wrlock(&ipcpi.state_lock); + +                if (ipcp_get_state() != IPCP_OPERATIONAL) { +                        pthread_rwlock_unlock(&ipcpi.state_lock); +                        LOG_ERR("IPCP in wrong state."); +                        return -1; +                } + +                if (cdap_reply_send(instance, key, 0, NULL, 0)) {                          pthread_rwlock_unlock(&ipcpi.state_lock);                          LOG_ERR("Failed to send reply to enrollment request.");                          return -1;                  } -                /* Loop through rtree and send correct objects */ -                LOG_DBGF("Sending ROs that need to be sent on enrolment..."); +                /* Loop through rtree and send correct objects. */ +                LOG_DBG("Sending ROs that need to be sent on enrolment...");                  pthread_mutex_lock(&rib.ro_lock);                  if (ribmgr_enrol_sync(instance, rib.root->child)) { @@ -957,57 +852,48 @@ static int ribmgr_cdap_start(struct cdap * instance,                          LOG_ERR("Failed to sync part of the RIB.");                          return -1;                  } +                  pthread_mutex_unlock(&rib.ro_lock);                  LOG_DBGF("Sending stop enrollment..."); -                pthread_mutex_lock(&rib.cdap_reqs_lock); - -                iid = cdap_send_request(instance, CDAP_STOP, ENROLLMENT, +                key = cdap_request_send(instance, CDAP_STOP, ENROLLMENT,                                          NULL, 0, 0); -                if (iid < 0) { -                        pthread_mutex_unlock(&rib.cdap_reqs_lock); +                if (key < 0) {                          pthread_rwlock_unlock(&ipcpi.state_lock);                          LOG_ERR("Failed to send stop of enrollment.");                          return -1;                  } -                if (cdap_result_wait(instance, CDAP_STOP, -                                     ENROLLMENT, iid)) { -                        pthread_mutex_unlock(&rib.cdap_reqs_lock); +                if (cdap_reply_wait(instance, key, NULL, NULL)) {                          pthread_rwlock_unlock(&ipcpi.state_lock);                          LOG_ERR("Remote failed to complete enrollment.");                          return -1;                  } -                pthread_mutex_unlock(&rib.cdap_reqs_lock); + +                pthread_rwlock_unlock(&ipcpi.state_lock);          } else { -                if (cdap_send_reply(instance, invoke_id, -1, NULL, 0)) { -                        pthread_rwlock_unlock(&ipcpi.state_lock); -                        LOG_ERR("Failed to send reply to start request."); -                        return -1; -                } +                LOG_WARN("Request to start unknown operation."); +                if (cdap_reply_send(instance, key, -1, NULL, 0)) +                        LOG_ERR("Failed to send negative reply.");          } -        pthread_rwlock_unlock(&ipcpi.state_lock);          return 0;  } -static int ribmgr_cdap_stop(struct cdap * instance, -                            int           invoke_id, -                            char *        name) +static int ribmgr_cdap_stop(struct cdap * instance, cdap_key_t key, char * name)  {          int ret = 0;          pthread_rwlock_wrlock(&ipcpi.state_lock); -        if (ipcp_get_state() == IPCP_CONFIG && -            strcmp(name, ENROLLMENT) == 0) { +        if (ipcp_get_state() == IPCP_CONFIG && strcmp(name, ENROLLMENT) == 0) {                  LOG_DBG("Stop enrollment received.");                  ipcp_set_state(IPCP_BOOTING);          } else                  ret = -1; -        if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { +        if (cdap_reply_send(instance, key, ret, NULL, 0)) {                  pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("Failed to send reply to stop request.");                  return -1; @@ -1028,8 +914,7 @@ static void ro_id_delete(void * o)          pthread_mutex_unlock(&rib.ro_ids_lock);  } -static int ro_id_create(char *     name, -                        ro_msg_t * msg) +static int ro_id_create(char * name, ro_msg_t * msg)  {          struct ro_id * tmp; @@ -1062,105 +947,113 @@ static int ro_id_create(char *     name,          return 0;  } -static int ribmgr_cdap_request(struct cdap *    instance, -                               int              invoke_id, -                               enum cdap_opcode opcode, -                               char *           name, -                               uint8_t *        data, -                               size_t           len, -                               uint32_t         flags) +static void * cdap_req_handler(void * o)  { +        struct cdap * instance = (struct cdap *) o; +        enum cdap_opcode opcode; +        char * name; +        uint8_t * data; +        size_t len; +        uint32_t flags;          ro_msg_t * msg; -        int ret = -1;          struct list_head * p = NULL; -        if (opcode == CDAP_START) -                return ribmgr_cdap_start(instance, -                                         invoke_id, -                                         name); -        else if (opcode == CDAP_STOP) -                return ribmgr_cdap_stop(instance, -                                        invoke_id, -                                        name); - -        msg = ro_msg__unpack(NULL, len, data); -        if (msg == NULL) { -                cdap_send_reply(instance, invoke_id, -1, NULL, 0); -                LOG_ERR("Failed to unpack RO message"); -                return -1; -        } +        assert(instance); -        pthread_mutex_lock(&rib.ro_ids_lock); -        list_for_each(p, &rib.ro_ids) { -                struct ro_id * e = list_entry(p, struct ro_id, next); +        while (true) { +                cdap_key_t key = cdap_request_wait(instance, +                                                   &opcode, +                                                   &name, +                                                   &data, +                                                   &len, +                                                   &flags); +                assert(key >= 0); -                if (strcmp(e->full_name, name) == 0 && -                    e->seqno == msg->seqno) { -                        pthread_mutex_unlock(&rib.ro_ids_lock); -                        ro_msg__free_unpacked(msg, NULL); -                        cdap_send_reply(instance, invoke_id, 0, NULL, 0); -                        LOG_DBG("Already received this RO."); -                        return 0; +                if (opcode == CDAP_START) { +                        if (ribmgr_cdap_start(instance, key, name)) +                                LOG_WARN("CDAP start failed."); +                        continue; +                } +                else if (opcode == CDAP_STOP) { +                        if (ribmgr_cdap_stop(instance, key, name)) +                                LOG_WARN("CDAP stop failed."); +                        continue;                  } -        } -        pthread_mutex_unlock(&rib.ro_ids_lock); - -        if (opcode == CDAP_CREATE) { -                ret = ribmgr_cdap_create(instance, -                                         invoke_id, -                                         name, -                                         msg); -        } else if (opcode == CDAP_WRITE) { -                ret = ribmgr_cdap_write(instance, -                                        invoke_id, -                                        name, msg, -                                        flags); - -        } else if (opcode == CDAP_DELETE) { -                ret = ribmgr_cdap_delete(instance, -                                         invoke_id, -                                         name); -        } else { -                LOG_INFO("Unsupported opcode received."); -                ro_msg__free_unpacked(msg, NULL); -                cdap_send_reply(instance, invoke_id, -1, NULL, 0); -                return -1; -        } -        if (ro_id_create(name, msg)) { -                LOG_ERR("Failed to create RO id."); -                return -1; -        } +                msg = ro_msg__unpack(NULL, len, data); +                if (msg == NULL) { +                        cdap_reply_send(instance, key, -1, NULL, 0); +                        LOG_WARN("Failed to unpack RO message"); +                        continue; +                } -        if (msg->recv_set == ALL_MEMBERS) { -                pthread_rwlock_rdlock(&rib.flows_lock); -                list_for_each(p, &rib.flows) { -                        struct mgmt_flow * e = -                                list_entry(p, struct mgmt_flow, next); +                pthread_mutex_lock(&rib.ro_ids_lock); +                list_for_each(p, &rib.ro_ids) { +                        struct ro_id * e = list_entry(p, struct ro_id, next); -                        /* Don't send it back */ -                        if (e->instance == instance) +                        if (strcmp(e->full_name, name) == 0 && +                            e->seqno == msg->seqno) { +                                pthread_mutex_unlock(&rib.ro_ids_lock); +                                ro_msg__free_unpacked(msg, NULL); +                                cdap_reply_send(instance, key, 0, NULL, 0); +                                LOG_DBG("Already received this RO.");                                  continue; +                        } +                } +                pthread_mutex_unlock(&rib.ro_ids_lock); -                        if (write_ro_msg(e->instance, msg, name, opcode)) { -                                LOG_ERR("Failed to send to a neighbor."); -                                pthread_rwlock_unlock(&rib.flows_lock); +                if (opcode == CDAP_CREATE) { +                        if (ribmgr_cdap_create(instance, key, name, msg)) { +                                LOG_WARN("CDAP create failed.");                                  ro_msg__free_unpacked(msg, NULL); -                                return -1; +                                continue;                          } +                } else if (opcode == CDAP_WRITE) { +                        if (ribmgr_cdap_write(instance, key, name, +                                              msg, flags)) { +                                LOG_WARN("CDAP write failed."); +                                ro_msg__free_unpacked(msg, NULL); +                                continue; +                        } +                } else if (opcode == CDAP_DELETE) { +                        if (ribmgr_cdap_delete(instance, key, name)) { +                                LOG_WARN("CDAP delete failed."); +                                ro_msg__free_unpacked(msg, NULL); +                                continue; +                        } +                } else { +                        LOG_INFO("Unsupported opcode received."); +                        ro_msg__free_unpacked(msg, NULL); +                        cdap_reply_send(instance, key, -1, NULL, 0); +                        continue;                  } -                pthread_rwlock_unlock(&rib.flows_lock); -        } -        ro_msg__free_unpacked(msg, NULL); +                if (ro_id_create(name, msg)) { +                        LOG_WARN("Failed to create RO id."); +                        ro_msg__free_unpacked(msg, NULL); +                        continue; +                } -        return ret; -} +                if (msg->recv_set == ALL_MEMBERS) { +                        pthread_rwlock_rdlock(&rib.flows_lock); +                        list_for_each(p, &rib.flows) { +                                struct mgmt_flow * e = +                                        list_entry(p, struct mgmt_flow, next); -static struct cdap_ops ribmgr_cdap_ops = { -        .cdap_reply   = ribmgr_cdap_reply, -        .cdap_request = ribmgr_cdap_request -}; +                                /* Don't send it back. */ +                                if (e->instance == instance) +                                        continue; + +                                if (write_ro_msg(e->instance, msg, +                                                 name, opcode)) +                                        LOG_WARN("Failed to send to neighbor."); +                        } +                        pthread_rwlock_unlock(&rib.flows_lock); +                } + +                ro_msg__free_unpacked(msg, NULL); +        } +}  int ribmgr_add_flow(int fd)  { @@ -1169,9 +1062,9 @@ int ribmgr_add_flow(int fd)          flow = malloc(sizeof(*flow));          if (flow == NULL) -                return -1; +                return -ENOMEM; -        instance = cdap_create(&ribmgr_cdap_ops, fd); +        instance = cdap_create(fd);          if (instance == NULL) {                  LOG_ERR("Failed to create CDAP instance");                  free(flow); @@ -1182,8 +1075,17 @@ int ribmgr_add_flow(int fd)          flow->instance = instance;          flow->fd = fd; +        if (pthread_create(&flow->handler, NULL, +                           cdap_req_handler, instance)) { +                LOG_ERR("Failed to start handler thread for mgt flow."); +                free(flow); +                return -1; +        } +          pthread_rwlock_wrlock(&rib.flows_lock); +          list_add(&flow->next, &rib.flows); +          pthread_rwlock_unlock(&rib.flows_lock);          return 0; @@ -1198,6 +1100,7 @@ int ribmgr_remove_flow(int fd)                  struct mgmt_flow * flow =                          list_entry(pos, struct mgmt_flow, next);                  if (flow->fd == fd) { +                        pthread_cancel(flow->handler);                          if (cdap_destroy(flow->instance))                                  LOG_ERR("Failed to destroy CDAP instance.");                          list_del(&flow->next); @@ -1218,10 +1121,9 @@ int ribmgr_bootstrap(struct dif_config * conf)          size_t len = 0;          struct ro_attr attr; -        if (conf == NULL || -            conf->type != IPCP_NORMAL) { +        if (conf == NULL || conf->type != IPCP_NORMAL) {                  LOG_ERR("Bad DIF configuration."); -                return -1; +                return -EINVAL;          }          ro_attr_init(&attr); @@ -1246,7 +1148,6 @@ int ribmgr_bootstrap(struct dif_config * conf)          len = static_info_msg__get_packed_size(&stat_info);          if (len == 0) {                  LOG_ERR("Failed to get size of static information."); -                addr_auth_destroy(rib.addr_auth);                  ribmgr_ro_delete(RIBMGR_PREFIX);                  return -1;          } @@ -1254,7 +1155,6 @@ int ribmgr_bootstrap(struct dif_config * conf)          data = malloc(len);          if (data == NULL) {                  LOG_ERR("Failed to allocate memory."); -                addr_auth_destroy(rib.addr_auth);                  ribmgr_ro_delete(RIBMGR_PREFIX);                  return -1;          } @@ -1265,7 +1165,6 @@ int ribmgr_bootstrap(struct dif_config * conf)                               attr, data, len) == NULL) {                  LOG_ERR("Failed to create static info RO.");                  free(data); -                addr_auth_destroy(rib.addr_auth);                  ribmgr_ro_delete(RIBMGR_PREFIX);                  return -1;          } @@ -1273,7 +1172,6 @@ int ribmgr_bootstrap(struct dif_config * conf)          if (dir_init()) {                  LOG_ERR("Failed to init directory");                  ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO); -                addr_auth_destroy(rib.addr_auth);                  ribmgr_ro_delete(RIBMGR_PREFIX);                  return -1;          } @@ -1282,7 +1180,6 @@ int ribmgr_bootstrap(struct dif_config * conf)                  LOG_ERR("Failed to initialize FRCT.");                  dir_fini();                  ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO); -                addr_auth_destroy(rib.addr_auth);                  ribmgr_ro_delete(RIBMGR_PREFIX);                  return -1;          } @@ -1296,12 +1193,14 @@ int ribmgr_enrol(void)  {          struct cdap * instance = NULL;          struct mgmt_flow * flow; -        int iid = 0; +        cdap_key_t key; +        int ret;          pthread_rwlock_wrlock(&ipcpi.state_lock);          if (ipcp_get_state() != IPCP_INIT) {                  pthread_rwlock_unlock(&ipcpi.state_lock); +                LOG_ERR("IPCP in wrong state.");                  return -1;          } @@ -1312,36 +1211,31 @@ int ribmgr_enrol(void)                  ipcp_set_state(IPCP_INIT);                  pthread_rwlock_unlock(&rib.flows_lock);                  pthread_rwlock_unlock(&ipcpi.state_lock); +                LOG_ERR("No flows in RIB.");                  return -1;          } -        flow = list_entry((&rib.flows)->next, struct mgmt_flow, next); +        flow = list_first_entry((&rib.flows), struct mgmt_flow, next);          instance = flow->instance; -        pthread_mutex_lock(&rib.cdap_reqs_lock); -        iid = cdap_send_request(instance, -                                CDAP_START, -                                ENROLLMENT, -                                NULL, 0, 0); -        if (iid < 0) { +        key = cdap_request_send(instance, CDAP_START, ENROLLMENT, NULL, 0, 0); +        if (key < 0) {                  ipcp_set_state(IPCP_INIT); -                pthread_mutex_unlock(&rib.cdap_reqs_lock);                  pthread_rwlock_unlock(&rib.flows_lock);                  pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("Failed to start enrollment.");                  return -1;          } -        if (cdap_result_wait(instance, CDAP_START, -                             ENROLLMENT, iid)) { +        ret = cdap_reply_wait(instance, key, NULL, NULL); +        if (ret) {                  ipcp_set_state(IPCP_INIT); -                pthread_mutex_unlock(&rib.cdap_reqs_lock);                  pthread_rwlock_unlock(&rib.flows_lock);                  pthread_rwlock_unlock(&ipcpi.state_lock); -                LOG_ERR("Failed to start enrollment."); +                LOG_ERR("Failed to enroll: %d.", ret);                  return -1;          } -        pthread_mutex_unlock(&rib.cdap_reqs_lock); +          pthread_rwlock_unlock(&rib.flows_lock);          pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1351,9 +1245,10 @@ int ribmgr_enrol(void)  int ribmgr_start_policies(void)  {          pthread_rwlock_rdlock(&ipcpi.state_lock); +          if (ipcp_get_state() != IPCP_BOOTING) {                  pthread_rwlock_unlock(&ipcpi.state_lock); -                LOG_ERR("Cannot start policies in wrong state"); +                LOG_ERR("Cannot start policies in wrong state.");                  return -1;          }          pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1365,7 +1260,7 @@ int ribmgr_start_policies(void)          }          rib.address = rib.addr_auth->address(); -        LOG_DBG("IPCP has address %lu", (unsigned long) rib.address); +        LOG_DBG("IPCP has address %lu.", (unsigned long) rib.address);          return 0;  } @@ -1380,22 +1275,21 @@ uint64_t ribmgr_address()          return rib.address;  } -static int send_neighbors_ro(char *           name, -                             ro_msg_t *       msg, -                             enum cdap_opcode code) +static int send_neighbors_ro(char * name, ro_msg_t * msg, enum cdap_opcode code)  {          struct list_head * p = NULL;          pthread_rwlock_rdlock(&rib.flows_lock); +          list_for_each(p, &rib.flows) {                  struct mgmt_flow * e = list_entry(p, struct mgmt_flow, next); -                  if (write_ro_msg(e->instance, msg, name, code)) { -                        LOG_ERR("Failed to send to a neighbor.");                          pthread_rwlock_unlock(&rib.flows_lock); +                        LOG_ERR("Failed to send to a neighbor.");                          return -1;                  }          } +          pthread_rwlock_unlock(&rib.flows_lock);          return 0; @@ -1499,9 +1393,7 @@ int ro_delete(const char * name)          return 0;  } -int ro_write(const char * name, -             uint8_t *    data, -             size_t       len) +int ro_write(const char * name, uint8_t * data, size_t len)  {          struct rnode * node;          ro_msg_t msg = RO_MSG__INIT; @@ -1541,8 +1433,7 @@ int ro_write(const char * name,          return 0;  } -ssize_t ro_read(const char * name, -                uint8_t **   data) +ssize_t ro_read(const char * name, uint8_t ** data)  {          struct rnode * node;          ssize_t        len; @@ -1572,8 +1463,7 @@ ssize_t ro_read(const char * name,          return len;  } -ssize_t ro_children(const char * name, -                    char ***     children) +ssize_t ro_children(const char * name, char *** children)  {          struct rnode * node;          struct rnode * child; @@ -1640,8 +1530,7 @@ bool ro_exists(const char * name)          return found;  } -int ro_subscribe(const char *        name, -                 struct ro_sub_ops * ops) +int ro_subscribe(const char * name, struct ro_sub_ops * ops)  {          struct ro_sub * sub;          int sid; | 
