diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ipcpd/local/main.c | 8 | ||||
| -rw-r--r-- | src/ipcpd/normal/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/ipcpd/normal/fmgr.c | 390 | ||||
| -rw-r--r-- | src/ipcpd/normal/ribmgr.c | 501 | ||||
| -rw-r--r-- | src/ipcpd/shim-eth-llc/main.c | 8 | ||||
| -rw-r--r-- | src/ipcpd/shim-udp/main.c | 8 | ||||
| -rw-r--r-- | src/irmd/ipcp.c | 12 | ||||
| -rw-r--r-- | src/irmd/ipcp.h | 15 | ||||
| -rw-r--r-- | src/lib/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/lib/cdap.c | 531 | ||||
| -rw-r--r-- | src/lib/cdap_req.c (renamed from src/ipcpd/normal/cdap_request.c) | 56 | ||||
| -rw-r--r-- | src/lib/cdap_req.h (renamed from src/ipcpd/normal/cdap_request.h) | 37 | ||||
| -rw-r--r-- | src/lib/dev.c | 8 | 
13 files changed, 816 insertions, 760 deletions
| diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 30d2d2bd..122beafb 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -217,10 +217,10 @@ static int ipcp_local_name_query(char * name)          return ret;  } -static int ipcp_local_flow_alloc(int           fd, -                                 char *        dst_name, -                                 char *        src_ae_name, -                                 enum qos_cube qos) +static int ipcp_local_flow_alloc(int       fd, +                                 char *    dst_name, +                                 char *    src_ae_name, +                                 qoscube_t qos)  {          int out_fd = -1; diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 67a7953b..5f85dd89 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -25,7 +25,6 @@ protobuf_generate_c(RO_SRCS RO_HDRS ro.proto)  set(SOURCE_FILES    # Add source files here    addr_auth.c -  cdap_request.c    crc32.c    dir.c    fmgr.c diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 8e416aa4..6684db7c 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -49,82 +49,31 @@ typedef FlowAllocMsg flow_alloc_msg_t;  #define FD_UPDATE_TIMEOUT 100000 /* nanoseconds */ -struct np1_flow { -        int           fd; -        cep_id_t      cep_id; -        enum qos_cube qos; -}; - -struct nm1_flow { -        int           fd; -        enum qos_cube qos; -}; -  struct { -        pthread_t          nm1_flow_acceptor; -        struct nm1_flow ** nm1_flows; +        flow_set_t *       nm1_set[QOS_CUBE_MAX]; +        fqueue_t *         nm1_fqs[QOS_CUBE_MAX];          pthread_rwlock_t   nm1_flows_lock; -        flow_set_t *       nm1_set; -        pthread_t          nm1_sdu_reader; -        struct np1_flow ** np1_flows; -        struct np1_flow ** np1_flows_cep; +        flow_set_t *       np1_set[QOS_CUBE_MAX]; +        fqueue_t *         np1_fqs[QOS_CUBE_MAX];          pthread_rwlock_t   np1_flows_lock; -        flow_set_t *       np1_set; + +        cep_id_t           np1_fd_to_cep_id[AP_MAX_FLOWS]; +        int                np1_cep_id_to_fd[IPCPD_MAX_CONNS]; + +        pthread_t          nm1_flow_acceptor; +        pthread_t          nm1_sdu_reader;          pthread_t          np1_sdu_reader;          /* FIXME: Replace with PFF */          int fd;  } fmgr; -static int add_nm1_fd(int fd, -                      enum qos_cube qos) -{ -        struct nm1_flow * tmp; - -        tmp = malloc(sizeof(*tmp)); -        if (tmp == NULL) -                return -1; - -        tmp->fd = fd; -        tmp->qos = qos; - -        pthread_rwlock_wrlock(&fmgr.nm1_flows_lock); -        fmgr.nm1_flows[fd] = tmp; -        pthread_rwlock_unlock(&fmgr.nm1_flows_lock); - -        flow_set_add(fmgr.nm1_set, fd); - -        /* FIXME: Temporary, until we have a PFF */ -        fmgr.fd = fd; - -        return 0; -} - -static int add_np1_fd(int           fd, -                      cep_id_t      cep_id, -                      enum qos_cube qos) -{ -        struct np1_flow * flow; - -        flow = malloc(sizeof(*flow)); -        if (flow == NULL) -                return -1; - -        flow->cep_id = cep_id; -        flow->qos = qos; -        flow->fd = fd; - -        fmgr.np1_flows[fd] = flow; -        fmgr.np1_flows_cep[cep_id] = flow; - -        return 0; -} -  static void * fmgr_nm1_acceptor(void * o)  {          int       fd;          char *    ae_name; +        qoscube_t cube;          qosspec_t qs;          (void) o; @@ -150,13 +99,13 @@ static void * fmgr_nm1_acceptor(void * o)                  if (!(strcmp(ae_name, MGMT_AE) == 0 ||                        strcmp(ae_name, DT_AE) == 0)) {                          if (flow_alloc_resp(fd, -1)) -                                LOG_ERR("Failed to reply to flow allocation."); +                                LOG_WARN("Failed to reply to flow allocation.");                          flow_dealloc(fd);                          continue;                  }                  if (flow_alloc_resp(fd, 0)) { -                        LOG_ERR("Failed to reply to flow allocation."); +                        LOG_WARN("Failed to reply to flow allocation.");                          flow_dealloc(fd);                          continue;                  } @@ -166,17 +115,19 @@ static void * fmgr_nm1_acceptor(void * o)                  if (strcmp(ae_name, MGMT_AE) == 0) {                          if (ribmgr_add_flow(fd)) { -                                LOG_ERR("Failed to hand fd to RIB."); +                                LOG_WARN("Failed to hand fd to RIB.");                                  flow_dealloc(fd);                                  continue;                          }                  } else { -                        /* FIXME: Pass correct QoS cube */ -                        if (add_nm1_fd(fd, QOS_CUBE_BE)) { -                                LOG_ERR("Failed to add fd to list."); +                        ipcp_flow_get_qoscube(fd, &cube); +                        if (flow_set_add(fmgr.nm1_set[cube], fd)) { +                                LOG_WARN("Failed to add fd.");                                  flow_dealloc(fd);                                  continue;                          } +                        /* FIXME: Temporary, until we have a PFF */ +                        fmgr.fd = fd;                  }                  free(ae_name); @@ -189,44 +140,40 @@ static void * fmgr_np1_sdu_reader(void * o)  {          struct shm_du_buff * sdb;          struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; -        struct np1_flow * flow;          int fd; -        fqueue_t * fq = fqueue_create(); -        if (fq == NULL) -                return (void *) 1; + +        int i = 0; +        int ret;          (void) o;          while (true) { -                int ret = flow_event_wait(fmgr.np1_set, fq, &timeout); +                /* FIXME: replace with scheduling policy call */ +                i = (i + 1) % QOS_CUBE_MAX; + +                ret = flow_event_wait(fmgr.np1_set[i], +                                      fmgr.np1_fqs[i], +                                      &timeout);                  if (ret == -ETIMEDOUT)                          continue;                  if (ret < 0) { -                        LOG_ERR("Event error: %d.", ret); +                        LOG_WARN("Event error: %d.", ret);                          continue;                  } -                while ((fd = fqueue_next(fq)) >= 0) { +                while ((fd = fqueue_next(fmgr.np1_fqs[i])) >= 0) {                          if (ipcp_flow_read(fd, &sdb)) { -                                LOG_ERR("Failed to read SDU from fd %d.", fd); +                                LOG_WARN("Failed to read SDU from fd %d.", fd);                                  continue;                          }                          pthread_rwlock_rdlock(&fmgr.np1_flows_lock); -                        flow = fmgr.np1_flows[fd]; -                        if (flow == NULL) { -                                pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                                ipcp_flow_del(sdb); -                                LOG_ERR("Failed to retrieve flow."); -                                continue; -                        } - -                        if (frct_i_write_sdu(flow->cep_id, sdb)) { +                        if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) {                                  pthread_rwlock_unlock(&fmgr.np1_flows_lock);                                  ipcp_flow_del(sdb); -                                LOG_ERR("Failed to hand SDU to FRCT."); +                                LOG_WARN("Failed to hand SDU to FRCT.");                                  continue;                          } @@ -244,14 +191,18 @@ void * fmgr_nm1_sdu_reader(void * o)          struct shm_du_buff * sdb;          struct pci * pci;          int fd; -        fqueue_t * fq = fqueue_create(); -        if (fq == NULL) -                return (void *) 1; +        int i = 0; +        int ret;          (void) o;          while (true) { -                int ret = flow_event_wait(fmgr.nm1_set, fq, &timeout); +                /* FIXME: replace with scheduling policy call */ +                i = (i + 1) % QOS_CUBE_MAX; + +                ret = flow_event_wait(fmgr.nm1_set[i], +                                      fmgr.nm1_fqs[i], +                                      &timeout);                  if (ret == -ETIMEDOUT)                          continue; @@ -260,7 +211,7 @@ void * fmgr_nm1_sdu_reader(void * o)                          continue;                  } -                while ((fd = fqueue_next(fq)) >= 0) { +                while ((fd = fqueue_next(fmgr.nm1_fqs[i])) >= 0) {                          if (ipcp_flow_read(fd, &sdb)) {                                  LOG_ERR("Failed to read SDU from fd %d.", fd);                                  continue; @@ -320,52 +271,55 @@ void * fmgr_nm1_sdu_reader(void * o)          return (void *) 0;  } -int fmgr_init() +static void fmgr_destroy_flows(void)  {          int i; -        fmgr.nm1_flows = malloc(sizeof(*(fmgr.nm1_flows)) * IRMD_MAX_FLOWS); -        if (fmgr.nm1_flows == NULL) -                return -1; - -        fmgr.np1_flows = malloc(sizeof(*(fmgr.np1_flows)) * IRMD_MAX_FLOWS); -        if (fmgr.np1_flows == NULL) { -                free(fmgr.nm1_flows); -                return -1; +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                flow_set_destroy(fmgr.nm1_set[i]); +                flow_set_destroy(fmgr.np1_set[i]); +                fqueue_destroy(fmgr.nm1_fqs[i]); +                fqueue_destroy(fmgr.np1_fqs[i]);          } +} -        fmgr.np1_flows_cep = -                malloc(sizeof(*(fmgr.np1_flows_cep)) * IRMD_MAX_FLOWS); -        if (fmgr.np1_flows_cep == NULL) { -                free(fmgr.np1_flows); -                free(fmgr.nm1_flows); -                return -1; -        } +int fmgr_init() +{ +        int i; -        for (i = 0; i < IRMD_MAX_FLOWS; i++) { -                fmgr.nm1_flows[i] = NULL; -                fmgr.np1_flows[i] = NULL; -                fmgr.np1_flows_cep[i] = NULL; -        } +        for (i = 0; i < AP_MAX_FLOWS; ++i) +                fmgr.np1_fd_to_cep_id[i] = INVALID_CEP_ID; + +        for (i = 0; i < IPCPD_MAX_CONNS; ++i) +                fmgr.np1_cep_id_to_fd[i] = -1;          pthread_rwlock_init(&fmgr.nm1_flows_lock, NULL);          pthread_rwlock_init(&fmgr.np1_flows_lock, NULL); -        fmgr.np1_set = flow_set_create(); -        if (fmgr.np1_set == NULL) { -                free(fmgr.np1_flows_cep); -                free(fmgr.np1_flows); -                free(fmgr.nm1_flows); -                return -1; -        } +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                fmgr.np1_set[i] = flow_set_create(); +                if (fmgr.np1_set == NULL) { +                        fmgr_destroy_flows(); +                        return -1; +                } -        fmgr.nm1_set = flow_set_create(); -        if (fmgr.nm1_set == NULL) { -                flow_set_destroy(fmgr.np1_set); -                free(fmgr.np1_flows_cep); -                free(fmgr.np1_flows); -                free(fmgr.nm1_flows); -                return -1; +                fmgr.np1_fqs[i] = fqueue_create(); +                if (fmgr.np1_fqs[i] == NULL) { +                        fmgr_destroy_flows(); +                        return -1; +                } + +                fmgr.nm1_set[i] = flow_set_create(); +                if (fmgr.nm1_set == NULL) { +                        fmgr_destroy_flows(); +                        return -1; +                } + +                fmgr.nm1_fqs[i] = fqueue_create(); +                if (fmgr.nm1_fqs[i] == NULL) { +                        fmgr_destroy_flows(); +                        return -1; +                }          }          pthread_create(&fmgr.nm1_flow_acceptor, NULL, fmgr_nm1_acceptor, NULL); @@ -378,6 +332,7 @@ int fmgr_init()  int fmgr_fini()  {          int i; +        int j;          pthread_cancel(fmgr.nm1_flow_acceptor);          pthread_cancel(fmgr.np1_sdu_reader); @@ -387,29 +342,25 @@ int fmgr_fini()          pthread_join(fmgr.np1_sdu_reader, NULL);          pthread_join(fmgr.nm1_sdu_reader, NULL); -        for (i = 0; i < IRMD_MAX_FLOWS; i++) { -                if (fmgr.nm1_flows[i] == NULL) -                        continue; -                flow_dealloc(fmgr.nm1_flows[i]->fd); -                free(fmgr.nm1_flows[i]); -        } +        for (i = 0; i < AP_MAX_FLOWS; ++i) +                for (j = 0; j < QOS_CUBE_MAX; ++j) +                        if (flow_set_has(fmgr.nm1_set[j], i)) { +                                flow_dealloc(i); +                                flow_set_del(fmgr.nm1_set[j], i); +                        }          pthread_rwlock_destroy(&fmgr.nm1_flows_lock);          pthread_rwlock_destroy(&fmgr.np1_flows_lock); -        flow_set_destroy(fmgr.nm1_set); -        flow_set_destroy(fmgr.np1_set); -        free(fmgr.np1_flows_cep); -        free(fmgr.np1_flows); -        free(fmgr.nm1_flows); +        fmgr_destroy_flows();          return 0;  } -int fmgr_np1_alloc(int           fd, -                   char *        dst_ap_name, -                   char *        src_ae_name, -                   enum qos_cube qos) +int fmgr_np1_alloc(int       fd, +                   char *    dst_ap_name, +                   char *    src_ae_name, +                   qoscube_t qos)  {          cep_id_t cep_id;          buffer_t buf; @@ -478,10 +429,8 @@ int fmgr_np1_alloc(int           fd,          free(ro_data); -        if (add_np1_fd(fd, cep_id, qos)) { -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                return -1; -        } +        fmgr.np1_fd_to_cep_id[fd] = cep_id; +        fmgr.np1_cep_id_to_fd[cep_id] = fd;          pthread_rwlock_unlock(&fmgr.np1_flows_lock); @@ -491,16 +440,13 @@ int fmgr_np1_alloc(int           fd,  /* Call under np1_flows lock */  static int np1_flow_dealloc(int fd)  { -        struct np1_flow * flow;          flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;          buffer_t buf;          int ret; +        qoscube_t cube; -        flow_set_del(fmgr.np1_set, fd); - -        flow = fmgr.np1_flows[fd]; -        if (flow == NULL) -                return -1; +        ipcp_flow_get_qoscube(fd, &cube); +        flow_set_del(fmgr.np1_set[cube], fd);          msg.code = FLOW_ALLOC_CODE__FLOW_DEALLOC; @@ -510,16 +456,15 @@ static int np1_flow_dealloc(int fd)          buf.data = malloc(buf.len);          if (buf.data == NULL) -                return -1; +                return -ENOMEM;          flow_alloc_msg__pack(&msg, buf.data); -        ret = frct_i_destroy(flow->cep_id, &buf); +        ret = frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf); -        fmgr.np1_flows[fd] = NULL; -        fmgr.np1_flows_cep[flow->cep_id] = NULL; +        fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] = INVALID_CEP_ID; +        fmgr.np1_fd_to_cep_id[fd] = -1; -        free(flow);          free(buf.data);          return ret; @@ -527,48 +472,39 @@ static int np1_flow_dealloc(int fd)  int fmgr_np1_alloc_resp(int fd, int response)  { -        struct np1_flow * flow;          flow_alloc_msg_t msg = FLOW_ALLOC_MSG__INIT;          buffer_t buf; -        pthread_rwlock_wrlock(&fmgr.np1_flows_lock); - -        flow = fmgr.np1_flows[fd]; -        if (flow == NULL) { -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                return -1; -        } -          msg.code = FLOW_ALLOC_CODE__FLOW_REPLY;          msg.response = response;          msg.has_response = true;          buf.len = flow_alloc_msg__get_packed_size(&msg); -        if (buf.len == 0) { -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); +        if (buf.len == 0)                  return -1; -        }          buf.data = malloc(buf.len); -        if (buf.data == NULL) { -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                return -1; -        } +        if (buf.data == NULL) +                return -ENOMEM;          flow_alloc_msg__pack(&msg, buf.data); +        pthread_rwlock_wrlock(&fmgr.np1_flows_lock); +          if (response < 0) { -                frct_i_destroy(flow->cep_id, &buf); +                frct_i_destroy(fmgr.np1_fd_to_cep_id[fd], &buf);                  free(buf.data); -                fmgr.np1_flows[fd] = NULL; -                fmgr.np1_flows_cep[flow->cep_id] = NULL; -                free(flow); +                fmgr.np1_cep_id_to_fd[fmgr.np1_fd_to_cep_id[fd]] +                        = INVALID_CEP_ID; +                fmgr.np1_fd_to_cep_id[fd] = -1;          } else { -                if (frct_i_accept(flow->cep_id, &buf, flow->qos)) { +                qoscube_t cube; +                ipcp_flow_get_qoscube(fd, &cube); +                if (frct_i_accept(fmgr.np1_fd_to_cep_id[fd], &buf, cube)) {                          pthread_rwlock_unlock(&fmgr.np1_flows_lock);                          return -1;                  } -                flow_set_add(fmgr.np1_set, fd); +                flow_set_add(fmgr.np1_set[cube], fd);          }          pthread_rwlock_unlock(&fmgr.np1_flows_lock); @@ -581,27 +517,25 @@ int fmgr_np1_dealloc(int fd)          int ret;          pthread_rwlock_wrlock(&fmgr.np1_flows_lock); +          ret = np1_flow_dealloc(fd); +          pthread_rwlock_unlock(&fmgr.np1_flows_lock);          return ret;  } -int fmgr_np1_post_buf(cep_id_t   cep_id, -                      buffer_t * buf) +int fmgr_np1_post_buf(cep_id_t cep_id, buffer_t * buf)  { -        struct np1_flow * flow;          int ret = 0;          int fd;          flow_alloc_msg_t * msg; - -        pthread_rwlock_wrlock(&fmgr.np1_flows_lock); +        qoscube_t cube;          /* Depending on the message call the function in ipcp-dev.h */          msg = flow_alloc_msg__unpack(NULL, buf->len, buf->data);          if (msg == NULL) { -                pthread_rwlock_unlock(&fmgr.np1_flows_lock);                  LOG_ERR("Failed to unpack flow alloc message");                  return -1;          } @@ -612,51 +546,41 @@ int fmgr_np1_post_buf(cep_id_t   cep_id,                                         msg->dst_name,                                         msg->src_ae_name);                  if (fd < 0) { -                        pthread_rwlock_unlock(&fmgr.np1_flows_lock);                          flow_alloc_msg__free_unpacked(msg, NULL);                          LOG_ERR("Failed to get fd for flow.");                          return -1;                  } -                if (add_np1_fd(fd, cep_id, msg->qos_cube)) { -                        pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                        flow_alloc_msg__free_unpacked(msg, NULL); -                        LOG_ERR("Failed to add np1 flow."); -                        return -1; -                } +                pthread_rwlock_wrlock(&fmgr.np1_flows_lock); + +                fmgr.np1_fd_to_cep_id[fd] = cep_id; +                fmgr.np1_cep_id_to_fd[cep_id] = fd; + +                pthread_rwlock_unlock(&fmgr.np1_flows_lock);                  break;          case FLOW_ALLOC_CODE__FLOW_REPLY: -                flow = fmgr.np1_flows_cep[cep_id]; -                if (flow == NULL) { -                        pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                        flow_alloc_msg__free_unpacked(msg, NULL); -                        LOG_ERR("No such flow in flow manager."); -                        return -1; -                } +                pthread_rwlock_wrlock(&fmgr.np1_flows_lock); -                ret = ipcp_flow_alloc_reply(flow->fd, msg->response); +                fd = fmgr.np1_cep_id_to_fd[cep_id]; +                ret = ipcp_flow_alloc_reply(fd, msg->response);                  if (msg->response < 0) { -                        fmgr.np1_flows[flow->fd] = NULL; -                        fmgr.np1_flows_cep[cep_id] = NULL; -                        free(flow); +                        fmgr.np1_fd_to_cep_id[fd] = INVALID_CEP_ID; +                        fmgr.np1_cep_id_to_fd[cep_id] = -1;                  } else { -                        flow_set_add(fmgr.np1_set, flow->fd); +                        ipcp_flow_get_qoscube(fd, &cube); +                        flow_set_add(fmgr.np1_set[cube], +                                     fmgr.np1_cep_id_to_fd[cep_id]);                  } +                pthread_rwlock_unlock(&fmgr.np1_flows_lock); +                  break;          case FLOW_ALLOC_CODE__FLOW_DEALLOC: -                flow = fmgr.np1_flows_cep[cep_id]; -                if (flow == NULL) { -                        pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                        flow_alloc_msg__free_unpacked(msg, NULL); -                        LOG_ERR("No such flow in flow manager."); -                        return -1; -                } - -                flow_set_del(fmgr.np1_set, flow->fd); - -                ret = flow_dealloc(flow->fd); +                fd = fmgr.np1_cep_id_to_fd[cep_id]; +                ipcp_flow_get_qoscube(fd, &cube); +                flow_set_del(fmgr.np1_set[cube], fd); +                ret = flow_dealloc(fd);                  break;          default:                  LOG_ERR("Got an unknown flow allocation message."); @@ -674,18 +598,12 @@ int fmgr_np1_post_buf(cep_id_t   cep_id,  int fmgr_np1_post_sdu(cep_id_t             cep_id,                        struct shm_du_buff * sdb)  { -        struct np1_flow * flow; +        int fd;          pthread_rwlock_rdlock(&fmgr.np1_flows_lock); -        flow = fmgr.np1_flows_cep[cep_id]; -        if (flow == NULL) { -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                LOG_ERR("Failed to find N flow."); -                return -1; -        } - -        if (ipcp_flow_write(flow->fd, sdb)) { +        fd = fmgr.np1_cep_id_to_fd[cep_id]; +        if (ipcp_flow_write(fd, sdb)) {                  pthread_rwlock_unlock(&fmgr.np1_flows_lock);                  LOG_ERR("Failed to hand SDU to N flow.");                  return -1; @@ -704,19 +622,19 @@ int fmgr_nm1_mgmt_flow(char * dst_name)          /* FIXME: Request retransmission. */          fd = flow_alloc(dst_name, MGMT_AE, NULL);          if (fd < 0) { -                LOG_ERR("Failed to allocate flow to %s", dst_name); +                LOG_ERR("Failed to allocate flow to %s.", dst_name);                  return -1;          }          result = flow_alloc_res(fd);          if (result < 0) { -                LOG_ERR("Result of flow allocation to %s is %d", +                LOG_ERR("Result of flow allocation to %s is %d.",                          dst_name, result);                  return -1;          }          if (ribmgr_add_flow(fd)) { -                LOG_ERR("Failed to hand file descriptor to RIB manager"); +                LOG_ERR("Failed to hand file descriptor to RIB manager.");                  flow_dealloc(fd);                  return -1;          } @@ -724,8 +642,7 @@ int fmgr_nm1_mgmt_flow(char * dst_name)          return 0;  } -int fmgr_nm1_dt_flow(char * dst_name, -                     enum qos_cube qos) +int fmgr_nm1_dt_flow(char * dst_name, qoscube_t qos)  {          int fd;          int result; @@ -733,28 +650,25 @@ int fmgr_nm1_dt_flow(char * dst_name,          /* FIXME: Map qos cube on correct QoS. */          fd = flow_alloc(dst_name, DT_AE, NULL);          if (fd < 0) { -                LOG_ERR("Failed to allocate flow to %s", dst_name); +                LOG_ERR("Failed to allocate flow to %s.", dst_name);                  return -1;          }          result = flow_alloc_res(fd);          if (result < 0) { -                LOG_ERR("Result of flow allocation to %s is %d", -                        dst_name, result); +                LOG_ERR("Allocate flow to %s result %d.", dst_name, result);                  return -1;          } -        if (add_nm1_fd(fd, qos)) { -                LOG_ERR("Failed to add file descriptor to list."); -                flow_dealloc(fd); -                return -1; -        } +        flow_set_add(fmgr.nm1_set[qos], fd); + +        /* FIXME: Temporary, until we have a PFF */ +        fmgr.fd = fd;          return 0;  } -int fmgr_nm1_write_sdu(struct pci *         pci, -                       struct shm_du_buff * sdb) +int fmgr_nm1_write_sdu(struct pci * pci, struct shm_du_buff * sdb)  {          if (pci == NULL || sdb == NULL)                  return -1; diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index b0738a0c..1e9bcc18 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -42,7 +42,6 @@  #include "dt_const.h"  #include "frct.h"  #include "ipcp.h" -#include "cdap_request.h"  #include "ro.h"  #include "path.h"  #include "dir.h" @@ -85,22 +84,28 @@ struct rnode {  };  struct mgmt_flow { +        struct list_head next; +          struct cdap *    instance;          int              fd; -        struct list_head next; + +        pthread_t        handler;  };  struct ro_sub { +        struct list_head    next; +          int                 sid; +          char *              name;          struct ro_sub_ops * ops; -        struct list_head    next;  };  struct ro_id { +        struct list_head next; +          uint64_t         seqno;          char *           full_name; -        struct list_head next;  };  struct { @@ -124,9 +129,6 @@ struct {          struct list_head    flows;          pthread_rwlock_t    flows_lock; -        struct list_head    cdap_reqs; -        pthread_mutex_t     cdap_reqs_lock; -          struct addr_auth *  addr_auth;          enum pol_addr_auth  addr_auth_type;  } rib; @@ -173,7 +175,7 @@ void ribmgr_ro_created(const char * name,          pthread_rwlock_unlock(&ipcpi.state_lock);  } -/* We only have a create operation for now */ +/* We only have a create operation for now. */  static struct ro_sub_ops ribmgr_sub_ops = {          .ro_created = ribmgr_ro_created,          .ro_updated = NULL, @@ -303,9 +305,12 @@ static void ro_delete_timer(void * o)  {          char * name = (char *) o; -        if (ribmgr_ro_delete(name)) { +        pthread_mutex_lock(&rib.ro_lock); + +        if (ribmgr_ro_delete(name))                  LOG_ERR("Failed to delete %s.", name); -        } + +        pthread_mutex_unlock(&rib.ro_lock);  }  static struct rnode * ribmgr_ro_create(const char *   name, @@ -342,7 +347,7 @@ static struct rnode * ribmgr_ro_create(const char *   name,                  node = node->child;                  sibling = false; -                /* Search horizontally */ +                /* Search horizontally. */                  while (node != NULL) {                          if (strcmp(node->name, token) == 0) {                                  break; @@ -400,15 +405,12 @@ static struct rnode * ribmgr_ro_create(const char *   name,          LOG_DBG("Created RO with name %s.", name); -        if (!(attr.expiry.tv_sec == 0 && -              attr.expiry.tv_nsec == 0)) { +        if (!(attr.expiry.tv_sec == 0 && attr.expiry.tv_nsec == 0)) {                  timeout = attr.expiry.tv_sec * 1000 +                          attr.expiry.tv_nsec / MILLION; -                if (timerwheel_add(rib.wheel, ro_delete_timer, -                                   new->full_name, strlen(new->full_name) + 1, -                                   timeout)) { +                if (timerwheel_add(rib.wheel, ro_delete_timer, new->full_name, +                                   strlen(new->full_name) + 1, timeout))                          LOG_ERR("Failed to add deletion timer of RO."); -                }          }          return new; @@ -434,51 +436,6 @@ static struct rnode * ribmgr_ro_write(const char * name,          return node;  } -/* Call while holding cdap_reqs_lock */ -/* FIXME: better not to call blocking functions under any lock */ -int cdap_result_wait(struct cdap * instance, -                     enum cdap_opcode code, -                     char * name, -                     int invoke_id) -{ -        struct cdap_request * req; -        int ret; -        char * name_dup = strdup(name); -        if (name_dup == NULL) -                return -1; - -        req = cdap_request_create(code, name_dup, invoke_id, instance); -        if (req == NULL) { -                free(name_dup); -                return -1; -        } - -        list_add(&req->next, &rib.cdap_reqs); - -        pthread_mutex_unlock(&rib.cdap_reqs_lock); - -        ret = cdap_request_wait(req); - -        pthread_mutex_lock(&rib.cdap_reqs_lock); - -        if (ret == -1)  /* should only be on ipcp shutdown */ -                LOG_DBG("Waiting CDAP request destroyed."); - -        if (ret == -ETIMEDOUT) -                LOG_ERR("CDAP Request timed out."); - -        if (ret) -                LOG_DBG("Unknown error code: %d.", ret); - -        if (!ret) -                ret = req->result; - -        list_del(&req->next); -        cdap_request_destroy(req); - -        return ret; -} -  static int write_ro_msg(struct cdap *    neighbor,                          ro_msg_t *       msg,                          char *           name, @@ -486,7 +443,8 @@ static int write_ro_msg(struct cdap *    neighbor,  {          uint8_t * data;          size_t len; -        int iid = 0; +        cdap_key_t key; +        int ret;          len = ro_msg__get_packed_size(msg);          if (len == 0) @@ -498,23 +456,21 @@ static int write_ro_msg(struct cdap *    neighbor,          ro_msg__pack(msg, data); -        pthread_mutex_lock(&rib.cdap_reqs_lock); -        iid = cdap_send_request(neighbor, code, -                                name, data, len, 0); -        if (iid < 0) { -                pthread_mutex_unlock(&rib.cdap_reqs_lock); +        key = cdap_request_send(neighbor, code, name, data, len, 0); +        if (key < 0) { +                LOG_ERR("Failed to send CDAP request.");                  free(data);                  return -1;          } -        if (cdap_result_wait(neighbor, code, name, iid)) { -                pthread_mutex_unlock(&rib.cdap_reqs_lock); -                free(data); -                LOG_ERR("Remote did not receive RIB object."); +        free(data); + +        ret = cdap_reply_wait(neighbor, key, NULL, NULL); +        if (ret < 0) { +                LOG_ERR("CDAP command with code %d and name %s failed:  %d.", +                        code, name, ret);                  return -1;          } -        pthread_mutex_unlock(&rib.cdap_reqs_lock); -        free(data);          return 0;  } @@ -522,7 +478,6 @@ static int write_ro_msg(struct cdap *    neighbor,  int ribmgr_init()  {          INIT_LIST_HEAD(&rib.flows); -        INIT_LIST_HEAD(&rib.cdap_reqs);          INIT_LIST_HEAD(&rib.subs);          INIT_LIST_HEAD(&rib.ro_ids); @@ -540,17 +495,9 @@ int ribmgr_init()                  return -1;          } -        if (pthread_mutex_init(&rib.cdap_reqs_lock, NULL)) { -                LOG_ERR("Failed to initialize mutex."); -                pthread_rwlock_destroy(&rib.flows_lock); -                free(rib.root); -                return -1; -        } -          if (pthread_mutex_init(&rib.ro_lock, NULL)) {                  LOG_ERR("Failed to initialize mutex.");                  pthread_rwlock_destroy(&rib.flows_lock); -                pthread_mutex_destroy(&rib.cdap_reqs_lock);                  free(rib.root);                  return -1;          } @@ -558,7 +505,6 @@ int ribmgr_init()          if (pthread_mutex_init(&rib.subs_lock, NULL)) {                  LOG_ERR("Failed to initialize mutex.");                  pthread_rwlock_destroy(&rib.flows_lock); -                pthread_mutex_destroy(&rib.cdap_reqs_lock);                  pthread_mutex_destroy(&rib.ro_lock);                  free(rib.root);                  return -1; @@ -567,7 +513,6 @@ int ribmgr_init()          if (pthread_mutex_init(&rib.ro_ids_lock, NULL)) {                  LOG_ERR("Failed to initialize mutex.");                  pthread_rwlock_destroy(&rib.flows_lock); -                pthread_mutex_destroy(&rib.cdap_reqs_lock);                  pthread_mutex_destroy(&rib.ro_lock);                  pthread_mutex_destroy(&rib.subs_lock);                  free(rib.root); @@ -578,7 +523,6 @@ int ribmgr_init()          if (rib.sids == NULL) {                  LOG_ERR("Failed to create bitmap.");                  pthread_rwlock_destroy(&rib.flows_lock); -                pthread_mutex_destroy(&rib.cdap_reqs_lock);                  pthread_mutex_destroy(&rib.ro_lock);                  pthread_mutex_destroy(&rib.subs_lock);                  pthread_mutex_destroy(&rib.ro_ids_lock); @@ -591,7 +535,6 @@ int ribmgr_init()                  LOG_ERR("Failed to create timerwheel.");                  bmp_destroy(rib.sids);                  pthread_rwlock_destroy(&rib.flows_lock); -                pthread_mutex_destroy(&rib.cdap_reqs_lock);                  pthread_mutex_destroy(&rib.ro_lock);                  pthread_mutex_destroy(&rib.subs_lock);                  pthread_mutex_destroy(&rib.ro_ids_lock); @@ -605,7 +548,6 @@ int ribmgr_init()                  timerwheel_destroy(rib.wheel);                  bmp_destroy(rib.sids);                  pthread_rwlock_destroy(&rib.flows_lock); -                pthread_mutex_destroy(&rib.cdap_reqs_lock);                  pthread_mutex_destroy(&rib.ro_lock);                  pthread_mutex_destroy(&rib.subs_lock);                  pthread_mutex_destroy(&rib.ro_ids_lock); @@ -633,16 +575,6 @@ int ribmgr_fini()          struct list_head * pos = NULL;          struct list_head * n = NULL; -        pthread_mutex_lock(&rib.cdap_reqs_lock); -        list_for_each_safe(pos, n, &rib.cdap_reqs) { -                struct cdap_request * req = -                        list_entry(pos, struct cdap_request, next); -                free(req->name); -                list_del(&req->next); -                free(req); -        } -        pthread_mutex_unlock(&rib.cdap_reqs_lock); -          pthread_rwlock_wrlock(&rib.flows_lock);          list_for_each_safe(pos, n, &rib.flows) {                  struct mgmt_flow * flow = @@ -668,7 +600,6 @@ int ribmgr_fini()          timerwheel_destroy(rib.wheel);          pthread_mutex_destroy(&rib.subs_lock); -        pthread_mutex_destroy(&rib.cdap_reqs_lock);          pthread_mutex_destroy(&rib.ro_lock);          pthread_rwlock_destroy(&rib.flows_lock);          pthread_mutex_destroy(&rib.ro_ids_lock); @@ -676,49 +607,8 @@ int ribmgr_fini()          return 0;  } -static int ribmgr_cdap_reply(struct cdap * instance, -                             int           invoke_id, -                             int           result, -                             uint8_t *     data, -                             size_t        len) -{ -        struct list_head * pos, * n = NULL; - -        /* We never perform reads on other RIBs */ -        (void) data; -        (void) len; - -        pthread_mutex_lock(&rib.cdap_reqs_lock); - -        list_for_each_safe(pos, n, &rib.cdap_reqs) { -                struct cdap_request * req = -                        list_entry(pos, struct cdap_request, next); -                if (req->instance == instance && -                    req->invoke_id == invoke_id && -                    req->state == REQ_PENDING) { -                        if (result != 0) -                                LOG_ERR("CDAP command with code %d and name %s " -                                        "failed with error %d", -                                        req->code, req->name, result); -                        else -                                LOG_DBG("CDAP command with code %d and name %s " -                                        "executed succesfully", -                                        req->code, req->name); - -                        pthread_mutex_unlock(&rib.cdap_reqs_lock); - -                        cdap_request_respond(req, result); - -                        pthread_mutex_lock(&rib.cdap_reqs_lock); -                } -        } -        pthread_mutex_unlock(&rib.cdap_reqs_lock); - -        return 0; -} -  static int ribmgr_cdap_create(struct cdap * instance, -                              int           invoke_id, +                              cdap_key_t    key,                                char *        name,                                ro_msg_t *    msg)  { @@ -729,6 +619,8 @@ static int ribmgr_cdap_create(struct cdap * instance,          struct ro_attr attr;          struct rnode * node; +        assert(instance); +          ro_attr_init(&attr);          attr.expiry.tv_sec = msg->sec;          attr.expiry.tv_nsec = msg->nsec; @@ -740,7 +632,7 @@ static int ribmgr_cdap_create(struct cdap * instance,          ro_data = malloc(msg->value.len);          if (ro_data == NULL) {                  pthread_mutex_unlock(&rib.ro_lock); -                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                cdap_reply_send(instance, key, -1, NULL, 0);                  return -1;          }          memcpy(ro_data, msg->value.data, msg->value.len); @@ -748,7 +640,7 @@ static int ribmgr_cdap_create(struct cdap * instance,          node = ribmgr_ro_create(name, attr, ro_data, msg->value.len);          if (node == NULL) {                  pthread_mutex_unlock(&rib.ro_lock); -                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                cdap_reply_send(instance, key, -1, NULL, 0);                  free(ro_data);                  return -1;          } @@ -778,7 +670,7 @@ static int ribmgr_cdap_create(struct cdap * instance,          pthread_mutex_unlock(&rib.subs_lock);          pthread_mutex_unlock(&rib.ro_lock); -        if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { +        if (cdap_reply_send(instance, key, ret, NULL, 0)) {                  LOG_ERR("Failed to send reply to create request.");                  return -1;          } @@ -787,7 +679,7 @@ static int ribmgr_cdap_create(struct cdap * instance,  }  static int ribmgr_cdap_delete(struct cdap * instance, -                              int           invoke_id, +                              cdap_key_t    key,                                char *        name)  {          struct list_head * p = NULL; @@ -798,7 +690,7 @@ static int ribmgr_cdap_delete(struct cdap * instance,          if (ribmgr_ro_delete(name)) {                  pthread_mutex_unlock(&rib.ro_lock); -                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                cdap_reply_send(instance, key, -1, NULL, 0);                  return -1;          } @@ -823,7 +715,7 @@ static int ribmgr_cdap_delete(struct cdap * instance,          pthread_mutex_unlock(&rib.subs_lock);          pthread_mutex_unlock(&rib.ro_lock); -        if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) { +        if (cdap_reply_send(instance, key, 0, NULL, 0)) {                  LOG_ERR("Failed to send reply to create request.");                  return -1;          } @@ -832,7 +724,7 @@ static int ribmgr_cdap_delete(struct cdap * instance,  }  static int ribmgr_cdap_write(struct cdap * instance, -                             int           invoke_id, +                             cdap_key_t    key,                               char *        name,                               ro_msg_t *    msg,                               uint32_t      flags) @@ -851,7 +743,7 @@ static int ribmgr_cdap_write(struct cdap * instance,          ro_data = malloc(msg->value.len);          if (ro_data == NULL) {                  pthread_mutex_unlock(&rib.ro_lock); -                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                cdap_reply_send(instance, key, -1, NULL, 0);                  return -1;          }          memcpy(ro_data, msg->value.data, msg->value.len); @@ -860,7 +752,7 @@ static int ribmgr_cdap_write(struct cdap * instance,          if (node == NULL) {                  pthread_mutex_unlock(&rib.ro_lock);                  free(ro_data); -                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                cdap_reply_send(instance, key, -1, NULL, 0);                  return -1;          }          node->seqno = msg->seqno; @@ -891,7 +783,7 @@ static int ribmgr_cdap_write(struct cdap * instance,          pthread_mutex_unlock(&rib.subs_lock);          pthread_mutex_unlock(&rib.ro_lock); -        if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { +        if (cdap_reply_send(instance, key, ret, NULL, 0)) {                  LOG_ERR("Failed to send reply to write request.");                  return -1;          } @@ -899,8 +791,7 @@ static int ribmgr_cdap_write(struct cdap * instance,          return 0;  } -static int ribmgr_enrol_sync(struct cdap * instance, -                             struct rnode * node) +static int ribmgr_enrol_sync(struct cdap * instance, struct rnode * node)  {          int ret = 0; @@ -931,24 +822,28 @@ static int ribmgr_enrol_sync(struct cdap * instance,  }  static int ribmgr_cdap_start(struct cdap * instance, -                             int           invoke_id, +                             cdap_key_t    key,                               char *        name)  { -        int iid = 0; - -        pthread_rwlock_wrlock(&ipcpi.state_lock); -        if (ipcp_get_state() == IPCP_OPERATIONAL && -            strcmp(name, ENROLLMENT) == 0) { +        if (strcmp(name, ENROLLMENT) == 0) {                  LOG_DBG("New enrollment request."); -                if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) { +                pthread_rwlock_wrlock(&ipcpi.state_lock); + +                if (ipcp_get_state() != IPCP_OPERATIONAL) { +                        pthread_rwlock_unlock(&ipcpi.state_lock); +                        LOG_ERR("IPCP in wrong state."); +                        return -1; +                } + +                if (cdap_reply_send(instance, key, 0, NULL, 0)) {                          pthread_rwlock_unlock(&ipcpi.state_lock);                          LOG_ERR("Failed to send reply to enrollment request.");                          return -1;                  } -                /* Loop through rtree and send correct objects */ -                LOG_DBGF("Sending ROs that need to be sent on enrolment..."); +                /* Loop through rtree and send correct objects. */ +                LOG_DBG("Sending ROs that need to be sent on enrolment...");                  pthread_mutex_lock(&rib.ro_lock);                  if (ribmgr_enrol_sync(instance, rib.root->child)) { @@ -957,57 +852,48 @@ static int ribmgr_cdap_start(struct cdap * instance,                          LOG_ERR("Failed to sync part of the RIB.");                          return -1;                  } +                  pthread_mutex_unlock(&rib.ro_lock);                  LOG_DBGF("Sending stop enrollment..."); -                pthread_mutex_lock(&rib.cdap_reqs_lock); - -                iid = cdap_send_request(instance, CDAP_STOP, ENROLLMENT, +                key = cdap_request_send(instance, CDAP_STOP, ENROLLMENT,                                          NULL, 0, 0); -                if (iid < 0) { -                        pthread_mutex_unlock(&rib.cdap_reqs_lock); +                if (key < 0) {                          pthread_rwlock_unlock(&ipcpi.state_lock);                          LOG_ERR("Failed to send stop of enrollment.");                          return -1;                  } -                if (cdap_result_wait(instance, CDAP_STOP, -                                     ENROLLMENT, iid)) { -                        pthread_mutex_unlock(&rib.cdap_reqs_lock); +                if (cdap_reply_wait(instance, key, NULL, NULL)) {                          pthread_rwlock_unlock(&ipcpi.state_lock);                          LOG_ERR("Remote failed to complete enrollment.");                          return -1;                  } -                pthread_mutex_unlock(&rib.cdap_reqs_lock); + +                pthread_rwlock_unlock(&ipcpi.state_lock);          } else { -                if (cdap_send_reply(instance, invoke_id, -1, NULL, 0)) { -                        pthread_rwlock_unlock(&ipcpi.state_lock); -                        LOG_ERR("Failed to send reply to start request."); -                        return -1; -                } +                LOG_WARN("Request to start unknown operation."); +                if (cdap_reply_send(instance, key, -1, NULL, 0)) +                        LOG_ERR("Failed to send negative reply.");          } -        pthread_rwlock_unlock(&ipcpi.state_lock);          return 0;  } -static int ribmgr_cdap_stop(struct cdap * instance, -                            int           invoke_id, -                            char *        name) +static int ribmgr_cdap_stop(struct cdap * instance, cdap_key_t key, char * name)  {          int ret = 0;          pthread_rwlock_wrlock(&ipcpi.state_lock); -        if (ipcp_get_state() == IPCP_CONFIG && -            strcmp(name, ENROLLMENT) == 0) { +        if (ipcp_get_state() == IPCP_CONFIG && strcmp(name, ENROLLMENT) == 0) {                  LOG_DBG("Stop enrollment received.");                  ipcp_set_state(IPCP_BOOTING);          } else                  ret = -1; -        if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { +        if (cdap_reply_send(instance, key, ret, NULL, 0)) {                  pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("Failed to send reply to stop request.");                  return -1; @@ -1028,8 +914,7 @@ static void ro_id_delete(void * o)          pthread_mutex_unlock(&rib.ro_ids_lock);  } -static int ro_id_create(char *     name, -                        ro_msg_t * msg) +static int ro_id_create(char * name, ro_msg_t * msg)  {          struct ro_id * tmp; @@ -1062,105 +947,113 @@ static int ro_id_create(char *     name,          return 0;  } -static int ribmgr_cdap_request(struct cdap *    instance, -                               int              invoke_id, -                               enum cdap_opcode opcode, -                               char *           name, -                               uint8_t *        data, -                               size_t           len, -                               uint32_t         flags) +static void * cdap_req_handler(void * o)  { +        struct cdap * instance = (struct cdap *) o; +        enum cdap_opcode opcode; +        char * name; +        uint8_t * data; +        size_t len; +        uint32_t flags;          ro_msg_t * msg; -        int ret = -1;          struct list_head * p = NULL; -        if (opcode == CDAP_START) -                return ribmgr_cdap_start(instance, -                                         invoke_id, -                                         name); -        else if (opcode == CDAP_STOP) -                return ribmgr_cdap_stop(instance, -                                        invoke_id, -                                        name); - -        msg = ro_msg__unpack(NULL, len, data); -        if (msg == NULL) { -                cdap_send_reply(instance, invoke_id, -1, NULL, 0); -                LOG_ERR("Failed to unpack RO message"); -                return -1; -        } +        assert(instance); -        pthread_mutex_lock(&rib.ro_ids_lock); -        list_for_each(p, &rib.ro_ids) { -                struct ro_id * e = list_entry(p, struct ro_id, next); +        while (true) { +                cdap_key_t key = cdap_request_wait(instance, +                                                   &opcode, +                                                   &name, +                                                   &data, +                                                   &len, +                                                   &flags); +                assert(key >= 0); -                if (strcmp(e->full_name, name) == 0 && -                    e->seqno == msg->seqno) { -                        pthread_mutex_unlock(&rib.ro_ids_lock); -                        ro_msg__free_unpacked(msg, NULL); -                        cdap_send_reply(instance, invoke_id, 0, NULL, 0); -                        LOG_DBG("Already received this RO."); -                        return 0; +                if (opcode == CDAP_START) { +                        if (ribmgr_cdap_start(instance, key, name)) +                                LOG_WARN("CDAP start failed."); +                        continue; +                } +                else if (opcode == CDAP_STOP) { +                        if (ribmgr_cdap_stop(instance, key, name)) +                                LOG_WARN("CDAP stop failed."); +                        continue;                  } -        } -        pthread_mutex_unlock(&rib.ro_ids_lock); - -        if (opcode == CDAP_CREATE) { -                ret = ribmgr_cdap_create(instance, -                                         invoke_id, -                                         name, -                                         msg); -        } else if (opcode == CDAP_WRITE) { -                ret = ribmgr_cdap_write(instance, -                                        invoke_id, -                                        name, msg, -                                        flags); - -        } else if (opcode == CDAP_DELETE) { -                ret = ribmgr_cdap_delete(instance, -                                         invoke_id, -                                         name); -        } else { -                LOG_INFO("Unsupported opcode received."); -                ro_msg__free_unpacked(msg, NULL); -                cdap_send_reply(instance, invoke_id, -1, NULL, 0); -                return -1; -        } -        if (ro_id_create(name, msg)) { -                LOG_ERR("Failed to create RO id."); -                return -1; -        } +                msg = ro_msg__unpack(NULL, len, data); +                if (msg == NULL) { +                        cdap_reply_send(instance, key, -1, NULL, 0); +                        LOG_WARN("Failed to unpack RO message"); +                        continue; +                } -        if (msg->recv_set == ALL_MEMBERS) { -                pthread_rwlock_rdlock(&rib.flows_lock); -                list_for_each(p, &rib.flows) { -                        struct mgmt_flow * e = -                                list_entry(p, struct mgmt_flow, next); +                pthread_mutex_lock(&rib.ro_ids_lock); +                list_for_each(p, &rib.ro_ids) { +                        struct ro_id * e = list_entry(p, struct ro_id, next); -                        /* Don't send it back */ -                        if (e->instance == instance) +                        if (strcmp(e->full_name, name) == 0 && +                            e->seqno == msg->seqno) { +                                pthread_mutex_unlock(&rib.ro_ids_lock); +                                ro_msg__free_unpacked(msg, NULL); +                                cdap_reply_send(instance, key, 0, NULL, 0); +                                LOG_DBG("Already received this RO.");                                  continue; +                        } +                } +                pthread_mutex_unlock(&rib.ro_ids_lock); -                        if (write_ro_msg(e->instance, msg, name, opcode)) { -                                LOG_ERR("Failed to send to a neighbor."); -                                pthread_rwlock_unlock(&rib.flows_lock); +                if (opcode == CDAP_CREATE) { +                        if (ribmgr_cdap_create(instance, key, name, msg)) { +                                LOG_WARN("CDAP create failed.");                                  ro_msg__free_unpacked(msg, NULL); -                                return -1; +                                continue;                          } +                } else if (opcode == CDAP_WRITE) { +                        if (ribmgr_cdap_write(instance, key, name, +                                              msg, flags)) { +                                LOG_WARN("CDAP write failed."); +                                ro_msg__free_unpacked(msg, NULL); +                                continue; +                        } +                } else if (opcode == CDAP_DELETE) { +                        if (ribmgr_cdap_delete(instance, key, name)) { +                                LOG_WARN("CDAP delete failed."); +                                ro_msg__free_unpacked(msg, NULL); +                                continue; +                        } +                } else { +                        LOG_INFO("Unsupported opcode received."); +                        ro_msg__free_unpacked(msg, NULL); +                        cdap_reply_send(instance, key, -1, NULL, 0); +                        continue;                  } -                pthread_rwlock_unlock(&rib.flows_lock); -        } -        ro_msg__free_unpacked(msg, NULL); +                if (ro_id_create(name, msg)) { +                        LOG_WARN("Failed to create RO id."); +                        ro_msg__free_unpacked(msg, NULL); +                        continue; +                } -        return ret; -} +                if (msg->recv_set == ALL_MEMBERS) { +                        pthread_rwlock_rdlock(&rib.flows_lock); +                        list_for_each(p, &rib.flows) { +                                struct mgmt_flow * e = +                                        list_entry(p, struct mgmt_flow, next); -static struct cdap_ops ribmgr_cdap_ops = { -        .cdap_reply   = ribmgr_cdap_reply, -        .cdap_request = ribmgr_cdap_request -}; +                                /* Don't send it back. */ +                                if (e->instance == instance) +                                        continue; + +                                if (write_ro_msg(e->instance, msg, +                                                 name, opcode)) +                                        LOG_WARN("Failed to send to neighbor."); +                        } +                        pthread_rwlock_unlock(&rib.flows_lock); +                } + +                ro_msg__free_unpacked(msg, NULL); +        } +}  int ribmgr_add_flow(int fd)  { @@ -1169,9 +1062,9 @@ int ribmgr_add_flow(int fd)          flow = malloc(sizeof(*flow));          if (flow == NULL) -                return -1; +                return -ENOMEM; -        instance = cdap_create(&ribmgr_cdap_ops, fd); +        instance = cdap_create(fd);          if (instance == NULL) {                  LOG_ERR("Failed to create CDAP instance");                  free(flow); @@ -1182,8 +1075,17 @@ int ribmgr_add_flow(int fd)          flow->instance = instance;          flow->fd = fd; +        if (pthread_create(&flow->handler, NULL, +                           cdap_req_handler, instance)) { +                LOG_ERR("Failed to start handler thread for mgt flow."); +                free(flow); +                return -1; +        } +          pthread_rwlock_wrlock(&rib.flows_lock); +          list_add(&flow->next, &rib.flows); +          pthread_rwlock_unlock(&rib.flows_lock);          return 0; @@ -1198,6 +1100,7 @@ int ribmgr_remove_flow(int fd)                  struct mgmt_flow * flow =                          list_entry(pos, struct mgmt_flow, next);                  if (flow->fd == fd) { +                        pthread_cancel(flow->handler);                          if (cdap_destroy(flow->instance))                                  LOG_ERR("Failed to destroy CDAP instance.");                          list_del(&flow->next); @@ -1218,10 +1121,9 @@ int ribmgr_bootstrap(struct dif_config * conf)          size_t len = 0;          struct ro_attr attr; -        if (conf == NULL || -            conf->type != IPCP_NORMAL) { +        if (conf == NULL || conf->type != IPCP_NORMAL) {                  LOG_ERR("Bad DIF configuration."); -                return -1; +                return -EINVAL;          }          ro_attr_init(&attr); @@ -1246,7 +1148,6 @@ int ribmgr_bootstrap(struct dif_config * conf)          len = static_info_msg__get_packed_size(&stat_info);          if (len == 0) {                  LOG_ERR("Failed to get size of static information."); -                addr_auth_destroy(rib.addr_auth);                  ribmgr_ro_delete(RIBMGR_PREFIX);                  return -1;          } @@ -1254,7 +1155,6 @@ int ribmgr_bootstrap(struct dif_config * conf)          data = malloc(len);          if (data == NULL) {                  LOG_ERR("Failed to allocate memory."); -                addr_auth_destroy(rib.addr_auth);                  ribmgr_ro_delete(RIBMGR_PREFIX);                  return -1;          } @@ -1265,7 +1165,6 @@ int ribmgr_bootstrap(struct dif_config * conf)                               attr, data, len) == NULL) {                  LOG_ERR("Failed to create static info RO.");                  free(data); -                addr_auth_destroy(rib.addr_auth);                  ribmgr_ro_delete(RIBMGR_PREFIX);                  return -1;          } @@ -1273,7 +1172,6 @@ int ribmgr_bootstrap(struct dif_config * conf)          if (dir_init()) {                  LOG_ERR("Failed to init directory");                  ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO); -                addr_auth_destroy(rib.addr_auth);                  ribmgr_ro_delete(RIBMGR_PREFIX);                  return -1;          } @@ -1282,7 +1180,6 @@ int ribmgr_bootstrap(struct dif_config * conf)                  LOG_ERR("Failed to initialize FRCT.");                  dir_fini();                  ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO); -                addr_auth_destroy(rib.addr_auth);                  ribmgr_ro_delete(RIBMGR_PREFIX);                  return -1;          } @@ -1296,12 +1193,14 @@ int ribmgr_enrol(void)  {          struct cdap * instance = NULL;          struct mgmt_flow * flow; -        int iid = 0; +        cdap_key_t key; +        int ret;          pthread_rwlock_wrlock(&ipcpi.state_lock);          if (ipcp_get_state() != IPCP_INIT) {                  pthread_rwlock_unlock(&ipcpi.state_lock); +                LOG_ERR("IPCP in wrong state.");                  return -1;          } @@ -1312,36 +1211,31 @@ int ribmgr_enrol(void)                  ipcp_set_state(IPCP_INIT);                  pthread_rwlock_unlock(&rib.flows_lock);                  pthread_rwlock_unlock(&ipcpi.state_lock); +                LOG_ERR("No flows in RIB.");                  return -1;          } -        flow = list_entry((&rib.flows)->next, struct mgmt_flow, next); +        flow = list_first_entry((&rib.flows), struct mgmt_flow, next);          instance = flow->instance; -        pthread_mutex_lock(&rib.cdap_reqs_lock); -        iid = cdap_send_request(instance, -                                CDAP_START, -                                ENROLLMENT, -                                NULL, 0, 0); -        if (iid < 0) { +        key = cdap_request_send(instance, CDAP_START, ENROLLMENT, NULL, 0, 0); +        if (key < 0) {                  ipcp_set_state(IPCP_INIT); -                pthread_mutex_unlock(&rib.cdap_reqs_lock);                  pthread_rwlock_unlock(&rib.flows_lock);                  pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("Failed to start enrollment.");                  return -1;          } -        if (cdap_result_wait(instance, CDAP_START, -                             ENROLLMENT, iid)) { +        ret = cdap_reply_wait(instance, key, NULL, NULL); +        if (ret) {                  ipcp_set_state(IPCP_INIT); -                pthread_mutex_unlock(&rib.cdap_reqs_lock);                  pthread_rwlock_unlock(&rib.flows_lock);                  pthread_rwlock_unlock(&ipcpi.state_lock); -                LOG_ERR("Failed to start enrollment."); +                LOG_ERR("Failed to enroll: %d.", ret);                  return -1;          } -        pthread_mutex_unlock(&rib.cdap_reqs_lock); +          pthread_rwlock_unlock(&rib.flows_lock);          pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1351,9 +1245,10 @@ int ribmgr_enrol(void)  int ribmgr_start_policies(void)  {          pthread_rwlock_rdlock(&ipcpi.state_lock); +          if (ipcp_get_state() != IPCP_BOOTING) {                  pthread_rwlock_unlock(&ipcpi.state_lock); -                LOG_ERR("Cannot start policies in wrong state"); +                LOG_ERR("Cannot start policies in wrong state.");                  return -1;          }          pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1365,7 +1260,7 @@ int ribmgr_start_policies(void)          }          rib.address = rib.addr_auth->address(); -        LOG_DBG("IPCP has address %lu", (unsigned long) rib.address); +        LOG_DBG("IPCP has address %lu.", (unsigned long) rib.address);          return 0;  } @@ -1380,22 +1275,21 @@ uint64_t ribmgr_address()          return rib.address;  } -static int send_neighbors_ro(char *           name, -                             ro_msg_t *       msg, -                             enum cdap_opcode code) +static int send_neighbors_ro(char * name, ro_msg_t * msg, enum cdap_opcode code)  {          struct list_head * p = NULL;          pthread_rwlock_rdlock(&rib.flows_lock); +          list_for_each(p, &rib.flows) {                  struct mgmt_flow * e = list_entry(p, struct mgmt_flow, next); -                  if (write_ro_msg(e->instance, msg, name, code)) { -                        LOG_ERR("Failed to send to a neighbor.");                          pthread_rwlock_unlock(&rib.flows_lock); +                        LOG_ERR("Failed to send to a neighbor.");                          return -1;                  }          } +          pthread_rwlock_unlock(&rib.flows_lock);          return 0; @@ -1499,9 +1393,7 @@ int ro_delete(const char * name)          return 0;  } -int ro_write(const char * name, -             uint8_t *    data, -             size_t       len) +int ro_write(const char * name, uint8_t * data, size_t len)  {          struct rnode * node;          ro_msg_t msg = RO_MSG__INIT; @@ -1541,8 +1433,7 @@ int ro_write(const char * name,          return 0;  } -ssize_t ro_read(const char * name, -                uint8_t **   data) +ssize_t ro_read(const char * name, uint8_t ** data)  {          struct rnode * node;          ssize_t        len; @@ -1572,8 +1463,7 @@ ssize_t ro_read(const char * name,          return len;  } -ssize_t ro_children(const char * name, -                    char ***     children) +ssize_t ro_children(const char * name, char *** children)  {          struct rnode * node;          struct rnode * child; @@ -1640,8 +1530,7 @@ bool ro_exists(const char * name)          return found;  } -int ro_subscribe(const char *        name, -                 struct ro_sub_ops * ops) +int ro_subscribe(const char * name, struct ro_sub_ops * ops)  {          struct ro_sub * sub;          int sid; diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index c7e17ff6..fd25dcd9 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -932,10 +932,10 @@ static int eth_llc_ipcp_name_query(char * name)          return ret;  } -static int eth_llc_ipcp_flow_alloc(int           fd, -                                   char *        dst_name, -                                   char *        src_ae_name, -                                   enum qos_cube qos) +static int eth_llc_ipcp_flow_alloc(int       fd, +                                   char *    dst_name, +                                   char *    src_ae_name, +                                   qoscube_t qos)  {          uint8_t ssap = 0;          uint8_t r_addr[MAC_SIZE]; diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index fd321780..ea408914 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -945,10 +945,10 @@ static int ipcp_udp_name_query(char * name)          return 0;  } -static int ipcp_udp_flow_alloc(int           fd, -                               char *        dst_name, -                               char *        src_ae_name, -                               enum qos_cube qos) +static int ipcp_udp_flow_alloc(int       fd, +                               char *    dst_name, +                               char *    src_ae_name, +                               qoscube_t qos)  {          struct sockaddr_in r_saddr; /* server address */          struct sockaddr_in f_saddr; /* flow */ diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c index 29327df4..aa5e4f8a 100644 --- a/src/irmd/ipcp.c +++ b/src/irmd/ipcp.c @@ -326,12 +326,12 @@ int ipcp_name_query(pid_t api,          return ret;  } -int ipcp_flow_alloc(pid_t         api, -                    int           port_id, -                    pid_t         n_api, -                    char *        dst_name, -                    char *        src_ae_name, -                    enum qos_cube qos) +int ipcp_flow_alloc(pid_t     api, +                    int       port_id, +                    pid_t     n_api, +                    char *    dst_name, +                    char *    src_ae_name, +                    qoscube_t qos)  {          ipcp_msg_t msg = IPCP_MSG__INIT;          ipcp_msg_t * recv_msg = NULL; diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h index 67b14ece..eb278e5b 100644 --- a/src/irmd/ipcp.h +++ b/src/irmd/ipcp.h @@ -42,17 +42,20 @@ int   ipcp_bootstrap(pid_t              api,  int   ipcp_name_reg(pid_t  api,                      char * name); +  int   ipcp_name_unreg(pid_t  api,                        char * name); +  int   ipcp_name_query(pid_t api,                        char * name); -int   ipcp_flow_alloc(pid_t         api, -                      int           port_id, -                      pid_t         n_api, -                      char *        dst_name, -                      char *        src_ae_name, -                      enum qos_cube qos); +int   ipcp_flow_alloc(pid_t     api, +                      int       port_id, +                      pid_t     n_api, +                      char *    dst_name, +                      char *    src_ae_name, +                      qoscube_t qos); +  int   ipcp_flow_alloc_resp(pid_t api,                             int   port_id,                             pid_t n_api, diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 9eaca9fd..22971806 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -29,6 +29,7 @@ set(SOURCE_FILES    # Add source files here    bitmap.c    cdap.c +  cdap_req.c    dev.c    hashtable.c    irm.c diff --git a/src/lib/cdap.c b/src/lib/cdap.c index df79be54..dee8f88c 100644 --- a/src/lib/cdap.c +++ b/src/lib/cdap.c @@ -25,39 +25,65 @@  #include <ouroboros/bitmap.h>  #include <ouroboros/dev.h>  #include <ouroboros/fcntl.h> +#include <ouroboros/errno.h> + +#include "cdap_req.h"  #include <stdlib.h>  #include <pthread.h> +#include <string.h> +#include <assert.h>  #include "cdap.pb-c.h"  typedef Cdap cdap_t;  typedef Opcode opcode_t; +typedef int32_t invoke_id_t; + +#define INVALID_INVOKE_ID -1  #define IDS_SIZE 256  #define BUF_SIZE 2048  struct cdap { -        int               fd; -        struct bmp *      ids; -        pthread_mutex_t   ids_lock; -        pthread_t         reader; -        struct cdap_ops * ops; +        int              fd; + +        struct bmp *     ids; +        pthread_mutex_t  ids_lock; + +        pthread_t        reader; + +        struct list_head sent; +        pthread_rwlock_t sent_lock; + +        struct list_head rcvd; +        pthread_cond_t   rcvd_cond; +        pthread_mutex_t  rcvd_lock;  }; -struct cdap_info { -        pthread_t thread; -        struct cdap * instance; -        cdap_t * msg; +struct cdap_rcvd { +        struct list_head next; + +        invoke_id_t      iid; + +        enum cdap_opcode opcode; +        char *           name; +        uint8_t *        data; +        size_t           len; +        uint32_t         flags;  };  static int next_invoke_id(struct cdap * instance)  {          int ret; +        assert(instance); +          pthread_mutex_lock(&instance->ids_lock); +          ret = bmp_allocate(instance->ids);          if (!bmp_is_id_valid(instance->ids, ret)) -                ret = -1; /* INVALID_INVOKE_ID */ +                ret = INVALID_INVOKE_ID; +          pthread_mutex_unlock(&instance->ids_lock);          return ret; @@ -67,140 +93,225 @@ static int release_invoke_id(struct cdap * instance, int id)  {          int ret; +        assert(instance); +          pthread_mutex_lock(&instance->ids_lock); +          ret = bmp_release(instance->ids, id); +          pthread_mutex_unlock(&instance->ids_lock);          return ret;  } -static void * handle_cdap_msg(void * o) +#define cdap_sent_has_key(i, key) (cdap_sent_get_by_key(i, key) != NULL) + +struct cdap_req * cdap_sent_get_by_key(struct cdap * instance, cdap_key_t key)  { -        struct cdap_info * info = (struct cdap_info *) o; -        struct cdap * instance = info->instance; -        cdap_t * msg = info->msg; - -        switch (msg->opcode) { -        case OPCODE__READ: -                if (msg->name != NULL) -                        instance->ops->cdap_request(instance, -                                                    msg->invoke_id, -                                                    CDAP_READ, -                                                    msg->name, -                                                    NULL, 0, 0); -                break; -        case OPCODE__WRITE: -                if (msg->name != NULL && -                    msg->has_value) -                        instance->ops->cdap_request(instance, -                                                    msg->invoke_id, -                                                    CDAP_WRITE, -                                                    msg->name, -                                                    msg->value.data, -                                                    msg->value.len, -                                                    msg->flags); -                break; -        case OPCODE__CREATE: -                if (msg->name != NULL && -                    msg->has_value) -                        instance->ops->cdap_request(instance, -                                                    msg->invoke_id, -                                                    CDAP_CREATE, -                                                    msg->name, -                                                    msg->value.data, -                                                    msg->value.len, 0); -                break; -        case OPCODE__DELETE: -                if (msg->name != NULL && -                    msg->has_value) -                        instance->ops->cdap_request(instance, -                                                    msg->invoke_id, -                                                    CDAP_DELETE, -                                                    msg->name, -                                                    msg->value.data, -                                                    msg->value.len, 0); -                break; -        case OPCODE__START: -                if (msg->name != NULL) -                        instance->ops->cdap_request(instance, -                                                    msg->invoke_id, -                                                    CDAP_START, -                                                    msg->name, -                                                    NULL, 0, 0); -                break; -        case OPCODE__STOP: -                if (msg->name != NULL) -                        instance->ops->cdap_request(instance, -                                                    msg->invoke_id, -                                                    CDAP_STOP, -                                                    msg->name, -                                                    NULL, 0, 0); -                break; -        case OPCODE__REPLY: -                instance->ops->cdap_reply(instance, -                                          msg->invoke_id, -                                          msg->result, -                                          msg->value.data, -                                          msg->value.len); -                release_invoke_id(instance, msg->invoke_id); -                break; -        default: -                break; +        struct list_head * p = NULL; +        struct cdap_req *  req = NULL; + +        assert(instance); +        assert(key >= 0); + +        pthread_rwlock_rdlock(&instance->sent_lock); + +        list_for_each(p, &instance->sent) { +                req = list_entry(p, struct cdap_req, next); +                if (req->key == key) { +                        pthread_rwlock_unlock(&instance->sent_lock); +                        return req; +                }          } -        free(info); -        cdap__free_unpacked(msg, NULL); +        pthread_rwlock_unlock(&instance->sent_lock); -        return (void *) 0; +        return NULL; +} + +static int cdap_sent_add(struct cdap * instance, struct cdap_req * req) +{ +        assert (instance); +        assert (req); + +        if (cdap_sent_has_key(instance, req->key)) +                return -EPERM; + +        pthread_rwlock_wrlock(&instance->sent_lock); + +        list_add(&req->next, &instance->sent); + +        pthread_rwlock_unlock(&instance->sent_lock); + +        return 0; +} + +static void cdap_sent_del(struct cdap * instance, struct cdap_req * req) +{ +        assert(instance); +        assert(req); + +        assert(cdap_sent_has_key(instance, req->key)); + +        pthread_rwlock_wrlock(&instance->sent_lock); + +        list_del(&req->next); + +        pthread_rwlock_unlock(&instance->sent_lock); +} + +static void cdap_sent_destroy(struct cdap * instance) +{ +        struct list_head * p = NULL; +        struct list_head * h = NULL; + +        assert(instance); + +        pthread_rwlock_wrlock(&instance->sent_lock); + +        list_for_each_safe(p, h, &instance->sent) { +                struct cdap_req * req = list_entry(p, struct cdap_req, next); +                list_del(&req->next); +                cdap_req_destroy(req); +        } + +        pthread_rwlock_unlock(&instance->sent_lock); +} + +static void cdap_rcvd_destroy(struct cdap * instance) +{ +        struct list_head * p = NULL; +        struct list_head * h = NULL; + +        assert(instance); + +        pthread_mutex_lock(&instance->rcvd_lock); + +        list_for_each_safe(p, h, &instance->sent) { +                struct cdap_rcvd * r = list_entry(p, struct cdap_rcvd, next); +                list_del(&r->next); +                if (r->data != NULL) +                        free(r->data); +                if (r->name != NULL) +                        free(r->name); +                free(r); +        } + +        pthread_mutex_unlock(&instance->rcvd_lock);  }  static void * sdu_reader(void * o)  {          struct cdap * instance = (struct cdap *) o; +        struct cdap_req * req; +        struct cdap_rcvd * rcvd;          cdap_t * msg;          uint8_t buf[BUF_SIZE];          ssize_t len; -        struct cdap_info * cdap_info; +        buffer_t data;          while (true) {                  len = flow_read(instance->fd, buf, BUF_SIZE);                  if (len < 0) -                        return (void *) -1; +                        continue;                  msg = cdap__unpack(NULL, len, buf);                  if (msg == NULL)                          continue; -                cdap_info = malloc(sizeof(*cdap_info)); -                if (cdap_info == NULL) { -                        cdap__free_unpacked(msg, NULL); -                        continue; +                if (msg->opcode != OPCODE__REPLY) { +                        rcvd = malloc(sizeof(*rcvd)); +                        if (rcvd == NULL) { +                                cdap__free_unpacked(msg, NULL); +                                continue; +                        } + +                        switch (msg->opcode) { +                        case OPCODE__START: +                                rcvd->opcode = CDAP_START; +                                break; +                        case OPCODE__STOP: +                                rcvd->opcode = CDAP_STOP; +                                break; +                        case OPCODE__READ: +                                rcvd->opcode = CDAP_READ; +                                break; +                        case OPCODE__WRITE: +                                rcvd->opcode = CDAP_WRITE; +                                break; +                        case OPCODE__CREATE: +                                rcvd->opcode = CDAP_CREATE; +                                break; +                        case OPCODE__DELETE: +                                rcvd->opcode = CDAP_DELETE; +                                break; +                        default: +                                cdap__free_unpacked(msg, NULL); +                                free(rcvd); +                                continue; +                        } +                        rcvd->iid   = msg->invoke_id; +                        rcvd->flags = msg->flags; +                        rcvd->name  = strdup(msg->name); +                        if (rcvd->name == NULL) { +                                cdap__free_unpacked(msg, NULL); +                                free(rcvd); +                                continue; +                        } + +                        if (msg->has_value) { +                                rcvd->len = msg->value.len; +                                rcvd->data = malloc(rcvd->len); +                                if (rcvd->data == NULL) { +                                        cdap__free_unpacked(msg, NULL); +                                        free(rcvd); +                                        continue; +                                } +                                memcpy(rcvd->data, msg->value.data, rcvd->len); +                        } else { +                                rcvd->len = 0; +                                rcvd->data = NULL; +                        } + +                        pthread_mutex_lock(&instance->rcvd_lock); + +                        list_add(&rcvd->next, &instance->rcvd); + +                        pthread_cond_signal(&instance->rcvd_cond); +                        pthread_mutex_unlock(&instance->rcvd_lock); +                } else  { +                        req = cdap_sent_get_by_key(instance, msg->invoke_id); +                        if (req == NULL) +                                continue; + +                        if (msg->has_value) { +                                data.len = msg->value.len; +                                data.data = malloc(data.len); +                                if (data.data == NULL) { +                                        cdap__free_unpacked(msg, NULL); +                                        continue; +                                } +                                memcpy(data.data, msg->value.data, data.len); +                        } else { +                                data.len = 0; +                                data.data = NULL; +                        } + +                        cdap_req_respond(req, msg->result, data);                  } -                cdap_info->instance = instance; -                cdap_info->msg = msg; - -                pthread_create(&cdap_info->thread, -                               NULL, -                               handle_cdap_msg, -                               (void *) cdap_info); - -                pthread_detach(cdap_info->thread); - +                cdap__free_unpacked(msg, NULL);          }          return (void *) 0;  } -struct cdap * cdap_create(struct cdap_ops * ops, -                          int               fd) +struct cdap * cdap_create(int fd)  {          struct cdap * instance = NULL;          int flags; -        if (ops == NULL || fd < 0 || -            ops->cdap_reply == NULL || -            ops->cdap_request == NULL) +        if (fd < 0)                  return NULL;          flags = flow_get_flags(fd); @@ -216,19 +327,43 @@ struct cdap * cdap_create(struct cdap_ops * ops,                  return NULL;          } -        instance->ops = ops; -        instance->fd = fd; +        if (pthread_mutex_init(&instance->rcvd_lock, NULL)) { +                pthread_mutex_destroy(&instance->ids_lock); +                free(instance); +                return NULL; +        } + +        if (pthread_rwlock_init(&instance->sent_lock, NULL)) { +                pthread_mutex_destroy(&instance->rcvd_lock); +                pthread_mutex_destroy(&instance->ids_lock); +                free(instance); +                return NULL; +        } + +        if (pthread_cond_init(&instance->rcvd_cond, NULL)) { +                pthread_rwlock_destroy(&instance->sent_lock); +                pthread_mutex_destroy(&instance->rcvd_lock); +                pthread_mutex_destroy(&instance->ids_lock); +                free(instance); +                return NULL; +        }          instance->ids = bmp_create(IDS_SIZE, 0);          if (instance->ids == NULL) { +                pthread_cond_destroy(&instance->rcvd_cond); +                pthread_rwlock_destroy(&instance->sent_lock); +                pthread_mutex_destroy(&instance->rcvd_lock); +                pthread_mutex_destroy(&instance->ids_lock);                  free(instance);                  return NULL;          } -        pthread_create(&instance->reader, -                       NULL, -                       sdu_reader, -                       (void *) instance); +        INIT_LIST_HEAD(&instance->sent); +        INIT_LIST_HEAD(&instance->rcvd); + +        instance->fd = fd; + +        pthread_create(&instance->reader, NULL, sdu_reader, instance);          return instance;  } @@ -247,27 +382,37 @@ int cdap_destroy(struct cdap * instance)          pthread_mutex_unlock(&instance->ids_lock); -        flow_dealloc(instance->fd); +        pthread_mutex_destroy(&instance->ids_lock); + +        cdap_sent_destroy(instance); + +        pthread_rwlock_destroy(&instance->sent_lock); + +        cdap_rcvd_destroy(instance); + +        pthread_mutex_destroy(&instance->rcvd_lock);          free(instance);          return 0;  } -static int write_msg(struct cdap * instance, -                     cdap_t * msg) +static int write_msg(struct cdap * instance, cdap_t * msg)  {          int ret;          uint8_t * data;          size_t len; +        assert(instance); +        assert(msg); +          len = cdap__get_packed_size(msg);          if (len == 0)                  return -1; -        data = malloc(BUF_SIZE); +        data = malloc(len);          if (data == NULL) -                return -1; +                return -ENOMEM;          cdap__pack(msg, data); @@ -278,22 +423,41 @@ static int write_msg(struct cdap * instance,          return ret;  } -int cdap_send_request(struct cdap *    instance, -                      enum cdap_opcode code, -                      char *           name, -                      uint8_t *        data, -                      size_t           len, -                      uint32_t         flags) +static cdap_key_t invoke_id_to_key(invoke_id_t iid) +{ +        if (iid == INVALID_INVOKE_ID) +                return INVALID_CDAP_KEY; + +        return (cdap_key_t) iid; +} + +static invoke_id_t key_to_invoke_id(cdap_key_t key) +{ +        if (key == INVALID_CDAP_KEY) +                return INVALID_INVOKE_ID; + +        return (invoke_id_t) key; +} + +cdap_key_t cdap_request_send(struct cdap *    instance, +                             enum cdap_opcode code, +                             char *           name, +                             uint8_t *        data, +                             size_t           len, +                             uint32_t         flags)  { -        int id;          cdap_t msg = CDAP__INIT; +        struct cdap_req * req; +        invoke_id_t iid; +        cdap_key_t key;          if (instance == NULL || name == NULL) -                return -1; +                return -EINVAL; -        id = next_invoke_id(instance); -        if (!bmp_is_id_valid(instance->ids, id)) -                return -1; + +        iid = next_invoke_id(instance); +        if (iid == INVALID_INVOKE_ID) +                return INVALID_CDAP_KEY;          switch (code) {          case CDAP_READ: @@ -315,39 +479,132 @@ int cdap_send_request(struct cdap *    instance,                  msg.opcode = OPCODE__STOP;                  break;          default: -                release_invoke_id(instance, id); -                return -1; +                release_invoke_id(instance, iid); +                return -EINVAL;          }          msg.name = name;          msg.has_flags = true;          msg.flags = flags; -        msg.invoke_id = id; +        msg.invoke_id = iid;          if (data != NULL) {                  msg.has_value = true;                  msg.value.data = data;                  msg.value.len = len;          } -        if (write_msg(instance, &msg)) -                return -1; +        key = invoke_id_to_key(iid); + +        req = cdap_req_create(key); +        if (req == NULL) +                return INVALID_CDAP_KEY; + +        if (cdap_sent_add(instance, req)) { +                cdap_req_destroy(req); +                return INVALID_CDAP_KEY; +        } + +        if (write_msg(instance, &msg)) { +                cdap_sent_del(instance, req); +                cdap_req_destroy(req); +                return INVALID_CDAP_KEY; +        } + +        return key; +} + +int cdap_reply_wait(struct cdap * instance, +                    cdap_key_t    key, +                    uint8_t **    data, +                    size_t *      len) +{ +        int ret; +        struct cdap_req * r; +        invoke_id_t iid = key_to_invoke_id(key); + +        if (instance == NULL || iid == INVALID_INVOKE_ID) +                return -EINVAL; + +        r = cdap_sent_get_by_key(instance, key); +        if (r == NULL) +                return -EINVAL; + +        ret = cdap_req_wait(r); +        if (ret < 0) +                return ret; + +        if (r->response) +                return r->response; + +        assert(ret == 0); + +        if (data != NULL) { +                *data = r->data.data; +                *len  = r->data.len; +        } + +        cdap_sent_del(instance, r); -        return id; +        release_invoke_id(instance, iid); + +        return 0;  } -int cdap_send_reply(struct cdap * instance, -                    int           invoke_id, +cdap_key_t cdap_request_wait(struct cdap *      instance, +                             enum cdap_opcode * opcode, +                             char **            name, +                             uint8_t **         data, +                             size_t *           len, +                             uint32_t *         flags) +{ +        struct cdap_rcvd * rcvd; +        invoke_id_t iid; + +        if (instance == NULL || opcode == NULL || name == NULL || data == NULL +            || len == NULL || flags == NULL) +                return -EINVAL; + +        pthread_mutex_lock(&instance->rcvd_lock); + +        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, +                             (void *) &instance->rcvd_lock); + +        while (list_empty(&instance->rcvd)) +                pthread_cond_wait(&instance->rcvd_cond, &instance->rcvd_lock); + +        rcvd = list_first_entry(&instance->rcvd, struct cdap_rcvd, next); + +        list_del(&rcvd->next); + +        pthread_cleanup_pop(true); + +        *opcode = rcvd->opcode; +        *name   = rcvd->name; +        *data   = rcvd->data; +        *len    = rcvd->len; +        *flags  = rcvd->flags; + +        iid = rcvd->iid; + +        free(rcvd); + +        return invoke_id_to_key(iid); +} + +int cdap_reply_send(struct cdap * instance, +                    cdap_key_t    key,                      int           result,                      uint8_t *     data,                      size_t        len)  {          cdap_t msg = CDAP__INIT; +        invoke_id_t iid = key_to_invoke_id(key);          if (instance == NULL) -                return -1; +                return -EINVAL;          msg.opcode = OPCODE__REPLY; -        msg.invoke_id = invoke_id; +        msg.invoke_id = iid;          msg.has_result = true;          msg.result = result; diff --git a/src/ipcpd/normal/cdap_request.c b/src/lib/cdap_req.c index 8409b508..02fa0846 100644 --- a/src/ipcpd/normal/cdap_request.c +++ b/src/lib/cdap_req.c @@ -1,7 +1,7 @@  /*   * Ouroboros - Copyright (C) 2016   * - * Normal IPCP - RIB Manager - CDAP request + * CDAP - CDAP request management   *   *    Sander Vrijders   <sander.vrijders@intec.ugent.be>   *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> @@ -25,27 +25,25 @@  #include <ouroboros/time_utils.h>  #include <ouroboros/errno.h> -#include "cdap_request.h" +#include "cdap_req.h"  #include <stdlib.h> +#include <assert.h> -struct cdap_request * cdap_request_create(enum cdap_opcode code, -                                          char *           name, -                                          int              invoke_id, -                                          struct cdap *    instance) +struct cdap_req * cdap_req_create(cdap_key_t key)  { -        struct cdap_request * creq = malloc(sizeof(*creq)); +        struct cdap_req * creq = malloc(sizeof(*creq));          pthread_condattr_t cattr;          if (creq == NULL)                  return NULL; -        creq->code = code; -        creq->name = name; -        creq->invoke_id = invoke_id; -        creq->instance = instance; -        creq->state = REQ_INIT; -        creq->result = -1; +        creq->key = key; +        creq->state     = REQ_INIT; + +        creq->response = -1; +        creq->data.data = NULL; +        creq->data.len  = 0;          pthread_condattr_init(&cattr);  #ifndef __APPLE__ @@ -56,13 +54,14 @@ struct cdap_request * cdap_request_create(enum cdap_opcode code,          INIT_LIST_HEAD(&creq->next); +        clock_gettime(PTHREAD_COND_CLOCK, &creq->birth); +          return creq;  } -void cdap_request_destroy(struct cdap_request * creq) +void cdap_req_destroy(struct cdap_req * creq)  { -        if (creq == NULL) -                return; +        assert(creq);          pthread_mutex_lock(&creq->lock); @@ -87,24 +86,19 @@ void cdap_request_destroy(struct cdap_request * creq)          pthread_cond_destroy(&creq->cond);          pthread_mutex_destroy(&creq->lock); -        if (creq->name != NULL) -                free(creq->name); -          free(creq);  } -int cdap_request_wait(struct cdap_request * creq) +int cdap_req_wait(struct cdap_req * creq)  {          struct timespec timeout = {(CDAP_REPLY_TIMEOUT / 1000),                                     (CDAP_REPLY_TIMEOUT % 1000) * MILLION};          struct timespec abstime;          int ret = -1; -        if (creq == NULL) -                return -EINVAL; +        assert(creq); -        clock_gettime(CLOCK_REALTIME, &abstime); -        ts_add(&abstime, &timeout, &abstime); +        ts_add(&creq->birth, &timeout, &abstime);          pthread_mutex_lock(&creq->lock); @@ -118,9 +112,8 @@ int cdap_request_wait(struct cdap_request * creq)          while (creq->state == REQ_PENDING) {                  if ((ret = -pthread_cond_timedwait(&creq->cond,                                                     &creq->lock, -                                                   &abstime)) == -ETIMEDOUT) { +                                                   &abstime)) == -ETIMEDOUT)                          break; -                }          }          if (creq->state == REQ_DESTROY) @@ -134,10 +127,9 @@ int cdap_request_wait(struct cdap_request * creq)          return ret;  } -void cdap_request_respond(struct cdap_request * creq, int response) +void cdap_req_respond(struct cdap_req * creq, int response, buffer_t data)  { -        if (creq == NULL) -                return; +        assert(creq);          pthread_mutex_lock(&creq->lock); @@ -146,8 +138,10 @@ void cdap_request_respond(struct cdap_request * creq, int response)                  return;          } -        creq->state = REQ_RESPONSE; -        creq->result = response; +        creq->state    = REQ_RESPONSE; +        creq->response = response; +        creq->data     = data; +          pthread_cond_broadcast(&creq->cond);          while (creq->state == REQ_RESPONSE) diff --git a/src/ipcpd/normal/cdap_request.h b/src/lib/cdap_req.h index 9cccfda5..714744ab 100644 --- a/src/ipcpd/normal/cdap_request.h +++ b/src/lib/cdap_req.h @@ -1,7 +1,7 @@  /*   * Ouroboros - Copyright (C) 2016   * - * Normal IPCP - RIB Manager - CDAP request + * CDAP - CDAP request management   *   *    Sander Vrijders   <sander.vrijders@intec.ugent.be>   *    Dimitri Staessens <dimitri.staessens@intec.ugent.be> @@ -21,12 +21,13 @@   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.   */ -#ifndef OUROBOROS_IPCPD_NORMAL_CDAP_REQUEST_H -#define OUROBOROS_IPCPD_NORMAL_CDAP_REQUEST_H +#ifndef OUROBOROS_CDAP_REQ_H +#define OUROBOROS_CDAP_REQ_H  #include <ouroboros/config.h>  #include <ouroboros/cdap.h>  #include <ouroboros/list.h> +#include <ouroboros/utils.h>  #include <pthread.h> @@ -38,31 +39,31 @@ enum creq_state {          REQ_DESTROY  }; -struct cdap_request { +struct cdap_req {          struct list_head next; -        enum cdap_opcode code; -        char *           name; -        int              invoke_id; -        struct cdap *    instance; +        struct timespec  birth; -        int              result; +        cdap_key_t       key; + +        int              response; +        buffer_t         data;          enum creq_state  state;          pthread_cond_t   cond;          pthread_mutex_t  lock;  }; -struct cdap_request * cdap_request_create(enum cdap_opcode code, -                                          char *           name, -                                          int              invoke_id, -                                          struct cdap *    instance); +struct cdap_req * cdap_req_create(cdap_key_t key); + +void              cdap_req_destroy(struct cdap_req * creq); -void                  cdap_request_destroy(struct cdap_request * creq); +int               cdap_req_wait(struct cdap_req * creq); -int                   cdap_request_wait(struct cdap_request * creq); +void              cdap_req_respond(struct cdap_req * creq, +                                   int               response, +                                   buffer_t          data); -void                  cdap_request_respond(struct cdap_request * creq, -                                           int                   response); +enum creq_state   cdap_req_get_state(struct cdap_req * creq); -#endif /* OUROBOROS_IPCPD_NORMAL_CDAP_REQUEST_H */ +#endif /* OUROBOROS_CDAP_REQ_H */ diff --git a/src/lib/dev.c b/src/lib/dev.c index bad56129..20976375 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -134,7 +134,7 @@ struct flow {          struct shm_flow_set * set;          int                   port_id;          int                   oflags; -        enum qos_cube         qos; +        qoscube_t             qos;          pid_t                 api; @@ -654,9 +654,8 @@ int flow_dealloc(int fd)          pthread_rwlock_unlock(&ai.data_lock);          recv_msg = send_recv_irm_msg_b(&msg); -        if (recv_msg == NULL) { +        if (recv_msg == NULL)                  return -1; -        }          if (!recv_msg->has_result) {                  irm_msg__free_unpacked(recv_msg, NULL); @@ -1435,11 +1434,10 @@ int ipcp_flow_fini(int fd)          pthread_rwlock_unlock(&ai.data_lock);          shm_rbuff_fini(rb); -          return 0;  } -int ipcp_flow_get_qoscube(int fd, enum qos_cube * cube) +int ipcp_flow_get_qoscube(int fd, qoscube_t * cube)  {          if (fd < 0 || fd >= AP_MAX_FLOWS || cube == NULL)                  return -EINVAL; | 
