diff options
Diffstat (limited to 'src/ipcpd/normal')
| -rw-r--r-- | src/ipcpd/normal/CMakeLists.txt | 4 | ||||
| -rw-r--r-- | src/ipcpd/normal/ribmgr.c | 1144 | ||||
| -rw-r--r-- | src/ipcpd/normal/ro.h | 33 | ||||
| -rw-r--r-- | src/ipcpd/normal/ro.proto | 9 | 
4 files changed, 798 insertions, 392 deletions
| diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 08c5c691..06e41e9c 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -20,6 +20,8 @@ protobuf_generate_c(STATIC_INFO_SRCS STATIC_INFO_HDRS  protobuf_generate_c(FLOW_ALLOC_SRCS FLOW_ALLOC_HDRS    flow_alloc.proto) +protobuf_generate_c(RO_SRCS RO_HDRS ro.proto) +  set(SOURCE_FILES          # Add source files here          addr_auth.c @@ -34,7 +36,7 @@ set(SOURCE_FILES  )  add_executable (ipcpd-normal ${SOURCE_FILES} ${IPCP_SOURCES} -  ${STATIC_INFO_SRCS} ${FLOW_ALLOC_SRCS}) +  ${STATIC_INFO_SRCS} ${FLOW_ALLOC_SRCS} ${RO_SRCS})  target_link_libraries (ipcpd-normal LINK_PUBLIC ouroboros)  include(MacroAddCompileFlags) diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index 96c7e7e0..49971eda 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -47,15 +47,22 @@  #include "static_info.pb-c.h"  typedef StaticInfoMsg static_info_msg_t; +#include "ro.pb-c.h" +typedef RoMsg ro_msg_t; +  #define SUBS_SIZE 25  #define ENROLLMENT     "enrollment" -#define STATIC_INFO    "static DIF information" + +#define RIBMGR_PREFIX  "/ribmgr" +#define STAT_INFO      "/statinfo"  #define PATH_DELIMITER "/"  /* RIB objects */  struct rnode {          char *            name; +        char *            full_name; +        uint64_t          seqno;          /*           * NOTE: Naive implementation for now, could be replaced by @@ -92,6 +99,7 @@ struct {          struct list_head   subs;          struct bmp *       sids;          pthread_mutex_t    subs_lock; +        int                ribmgr_sid;          struct dt_const    dtc; @@ -106,6 +114,296 @@ struct {          struct addr_auth * addr_auth;  } rib; +int ribmgr_ro_created(const char * name, +                      uint8_t *    data, +                      size_t       len) +{ +        static_info_msg_t * stat_msg; + +        pthread_rwlock_wrlock(&ipcpi.state_lock); +        if (ipcp_get_state() == IPCP_PENDING_ENROLL && +            strcmp(name, RIBMGR_PREFIX STAT_INFO) == 0) { +                LOG_DBG("Received static DIF information."); + +                stat_msg = static_info_msg__unpack(NULL, len, data); +                if (stat_msg == NULL) { +                        ipcp_set_state(IPCP_INIT); +                        pthread_rwlock_unlock(&ipcpi.state_lock); +                        LOG_ERR("Failed to unpack static info message."); +                        return -1; +                } + +                rib.dtc.addr_size = stat_msg->addr_size; +                rib.dtc.cep_id_size = stat_msg->cep_id_size; +                rib.dtc.pdu_length_size = stat_msg->pdu_length_size; +                rib.dtc.seqno_size = stat_msg->seqno_size; +                rib.dtc.has_ttl = stat_msg->has_ttl; +                rib.dtc.has_chk = stat_msg->has_chk; +                rib.dtc.min_pdu_size = stat_msg->min_pdu_size; +                rib.dtc.max_pdu_size = stat_msg->max_pdu_size; + +                rib.addr_auth = addr_auth_create(stat_msg->addr_auth_type); +                if (rib.addr_auth == NULL) { +                        ipcp_set_state(IPCP_INIT); +                        pthread_rwlock_unlock(&ipcpi.state_lock); +                        static_info_msg__free_unpacked(stat_msg, NULL); +                        LOG_ERR("Failed to create address authority"); +                        return -1; +                } + +                rib.address = rib.addr_auth->address(); +                LOG_DBG("IPCP has address %lu", rib.address); + +                if (frct_init()) { +                        ipcp_set_state(IPCP_INIT); +                        pthread_rwlock_unlock(&ipcpi.state_lock); +                        static_info_msg__free_unpacked(stat_msg, NULL); +                        LOG_ERR("Failed to init FRCT"); +                        return -1; +                } + +                static_info_msg__free_unpacked(stat_msg, NULL); +        } +        pthread_rwlock_unlock(&ipcpi.state_lock); + +        return 0; +} + +/* We only have a create operation for now */ +static struct ro_sub_ops ribmgr_sub_ops = { +        .ro_created = ribmgr_ro_created, +        .ro_updated = NULL, +        .ro_deleted = NULL +}; + +static struct rnode * find_rnode_by_name(const char * name) +{ +        char * str; +        char * str1; +        char * saveptr; +        char * token; +        struct rnode * node; + +        str = strdup(name); +        if (str == NULL) +                return NULL; + +        node = rib.root; + +        for (str1 = str; ; str1 = NULL) { +                token = strtok_r(str1, PATH_DELIMITER, &saveptr); +                if (token == NULL) +                        break; + +                node = node->child; + +                while (node != NULL) +                        if (strcmp(node->name, token) == 0) +                                break; +                        else +                                node = node->sibling; + +                if (node == NULL) { +                        free(str); +                        return NULL; +                } +        } + +        free(str); +        return node; +} + +/* Call under RIB object lock */ +static int ro_msg_create(struct rnode * node, +                         ro_msg_t *     msg) +{ +        msg->address = rib.address; +        msg->seqno = node->seqno; +        msg->recv_set = node->props->recv_set; +        msg->enrol_sync = node->props->enrol_sync; +        msg->sec = node->props->expiry.tv_sec; +        msg->nsec = node->props->expiry.tv_nsec; +        msg->value.data = node->data; +        msg->value.len = node->len; + +        return 0; +} + +static struct rnode * ribmgr_ro_create(const char *      name, +                                       struct ro_props * props, +                                       uint8_t *         data, +                                       size_t            len) +{ +        char * str; +        char * str1; +        char * saveptr; +        char * token; +        char * token2; +        struct rnode * node; +        struct rnode * new; +        struct rnode * prev; +        bool sibling; + +        str = strdup(name); +        if (str == NULL) +                return NULL; + +        node = rib.root; + +        for (str1 = str; ; str1 = NULL) { +                token = strtok_r(str1, PATH_DELIMITER, &saveptr); +                if (token == NULL) { +                        LOG_ERR("RO already exists."); +                        free(str); +                        return NULL; +                } + +                prev = node; +                node = node->child; +                sibling = false; + +                /* Search horizontally */ +                while (node != NULL) { +                        if (strcmp(node->name, token) == 0) { +                                break; +                        } else { +                                prev = node; +                                node = node->sibling; +                                sibling = true; +                        } +                } + +                if (node == NULL) +                        break; +        } + +        token2 = strtok_r(NULL, PATH_DELIMITER, &saveptr); +        if (token2 != NULL) { +                LOG_ERR("Part of the pathname does not exist."); +                free(str); +                return NULL; +        } + +        new = malloc(sizeof(*new)); +        if (new == NULL) { +                free(str); +                return NULL; +        } + +        new->name = strdup(token); +        if (new->name == NULL) { +                free(str); +                free(new); +                return NULL; +        } + +        free(str); + +        new->full_name = strdup(name); +        if (new->full_name == NULL) { +                free(new); +                return NULL; +        } + +        new->seqno = 0; +        new->props = props; + +        if (sibling) +                prev->sibling = new; +        else +                prev->child = new; + +        new->data = data; +        new->len = len; +        new->child = NULL; +        new->sibling = NULL; + +        return new; +} + +static int ribmgr_ro_delete(const char * name) +{ +        char * str; +        char * str1; +        char * saveptr; +        char * token; +        struct rnode * node; +        struct rnode * prev; +        bool sibling = false; + +        str = strdup(name); +        if (str == NULL) +                return -1; + +        node = rib.root; +        prev = NULL; + +        for (str1 = str; ; str1 = NULL) { +                token = strtok_r(str1, PATH_DELIMITER, &saveptr); +                if (token == NULL) +                        break; + +                prev = node; +                node = node->child; +                sibling = false; + +                while (node != NULL) { +                        if (strcmp(node->name, token) == 0) { +                                break; +                        } else { +                                prev = node; +                                node = node->sibling; +                                sibling = true; +                        } +                } + +                if (node == NULL) { +                        free(str); +                        return -1; +                } +        } + +        if (node == rib.root) { +                LOG_ERR("Won't remove root."); +                free(str); +                return -1; +        } + +        free(node->name); +        free(node->full_name); +        if (node->data != NULL) +                free(node->data); + +        if (sibling) +                prev->sibling = node->sibling; +        else +                prev->child = node->sibling; + +        free(node); +        free(str); + +        return 0; +} + +static struct rnode * ribmgr_ro_write(const char * name, +                                      uint8_t *    data, +                                      size_t       len) +{ +        struct rnode * node; + +        node = find_rnode_by_name(name); +        if (node == NULL) +                return NULL; + +        free(node->data); + +        node->data = data; +        node->len = len; +        node->seqno++; + +        return node; +} +  /* Call while holding cdap_reqs_lock */  /* FIXME: better not to call blocking functions under any lock */  int cdap_result_wait(struct cdap * instance, @@ -151,6 +449,46 @@ int cdap_result_wait(struct cdap * instance,          return ret;  } +static int write_ro_msg(struct cdap *    neighbor, +                        ro_msg_t *       msg, +                        char *           name, +                        enum cdap_opcode code) +{ +        uint8_t * data; +        size_t len; +        int iid = 0; + +        len = ro_msg__get_packed_size(msg); +        if (len == 0) +                return -1; + +        data = malloc(len); +        if (data == NULL) +                return -ENOMEM; + +        ro_msg__pack(msg, data); + +        pthread_mutex_lock(&rib.cdap_reqs_lock); +        iid = cdap_send_request(neighbor, code, +                                name, data, len, 0); +        if (iid < 0) { +                pthread_mutex_unlock(&rib.cdap_reqs_lock); +                free(data); +                return -1; +        } + +        if (cdap_result_wait(neighbor, code, name, iid)) { +                pthread_mutex_unlock(&rib.cdap_reqs_lock); +                free(data); +                LOG_ERR("Remote did not receive RIB object."); +                return -1; +        } +        pthread_mutex_unlock(&rib.cdap_reqs_lock); +        free(data); + +        return 0; +} +  int ribmgr_init()  {          INIT_LIST_HEAD(&rib.flows); @@ -206,6 +544,18 @@ int ribmgr_init()                  return -1;          } +        rib.ribmgr_sid = ro_subscribe(RIBMGR_PREFIX, &ribmgr_sub_ops); +        if (rib.ribmgr_sid < 0) { +                LOG_ERR("Failed to subscribe."); +                bmp_destroy(rib.sids); +                pthread_rwlock_destroy(&rib.flows_lock); +                pthread_mutex_destroy(&rib.cdap_reqs_lock); +                pthread_mutex_destroy(&rib.ro_lock); +                pthread_mutex_destroy(&rib.subs_lock); +                free(rib.root); +                return -1; +        } +          return 0;  } @@ -247,6 +597,8 @@ int ribmgr_fini()          }          pthread_rwlock_unlock(&rib.flows_lock); +        ro_unsubscribe(rib.ribmgr_sid); +          if (rib.addr_auth != NULL)                  addr_auth_destroy(rib.addr_auth); @@ -273,6 +625,7 @@ static int ribmgr_cdap_reply(struct cdap * instance,  {          struct list_head * pos, * n = NULL; +        /* We never perform reads on other RIBs */          (void) data;          (void) len; @@ -295,7 +648,6 @@ static int ribmgr_cdap_reply(struct cdap * instance,                          pthread_mutex_unlock(&rib.cdap_reqs_lock); -                        /* FIXME: In case of a read, update values here */                          cdap_request_respond(req, result);                          pthread_mutex_lock(&rib.cdap_reqs_lock); @@ -306,69 +658,184 @@ static int ribmgr_cdap_reply(struct cdap * instance,          return 0;  } +static int ribmgr_cdap_create(struct cdap * instance, +                              int           invoke_id, +                              char *        name, +                              ro_msg_t *    msg) +{ +        int ret = 0; +        struct list_head * p = NULL; +        size_t len_s, len_n; +        uint8_t * ro_data; +        struct ro_props * props; +        struct rnode * node; + +        props = malloc(sizeof(*props)); +        if (props == NULL) { +                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                return -ENOMEM; +        } + +        props->expiry.tv_sec = msg->sec; +        props->expiry.tv_nsec = msg->nsec; +        props->enrol_sync = msg->enrol_sync; + +        pthread_mutex_lock(&rib.ro_lock); + +        ro_data = malloc(msg->value.len); +        if (ro_data == NULL) { +                pthread_mutex_unlock(&rib.ro_lock); +                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                free(props); +                return -1; +        } +        memcpy(ro_data, msg->value.data, msg->value.len); + +        node = ribmgr_ro_create(name, props, ro_data, msg->value.len); +        if (node == NULL) { +                pthread_mutex_unlock(&rib.ro_lock); +                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                free(props); +                free(ro_data); +                return -1; +        } + +        pthread_mutex_lock(&rib.subs_lock); +        list_for_each(p, &rib.subs) { +                struct ro_sub * e = list_entry(p, struct ro_sub, next); +                len_s = strlen(e->name); +                len_n = strlen(name); + +                if (len_n < len_s) +                        continue; + +                if (memcmp(name, e->name, len_s) == 0) { +                        if (e->ops->ro_created == NULL) +                                continue; + +                        ro_data = malloc(node->len); +                        if (ro_data == NULL) +                                continue; + +                        memcpy(ro_data, node->data, node->len); +                        e->ops->ro_created(name, ro_data, node->len); +                } +        } + +        pthread_mutex_unlock(&rib.subs_lock); +        pthread_mutex_unlock(&rib.ro_lock); + +        if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { +                LOG_ERR("Failed to send reply to create request."); +                return -1; +        } + +        return 0; +} + +static int ribmgr_cdap_delete(struct cdap * instance, +                              int           invoke_id, +                              char *        name) +{ +        struct list_head * p = NULL; +        size_t len_s; +        size_t len_n; + +        pthread_mutex_lock(&rib.ro_lock); + +        if (ribmgr_ro_delete(name)) { +                pthread_mutex_unlock(&rib.ro_lock); +                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                return -1; +        } + +        pthread_mutex_lock(&rib.subs_lock); + +        list_for_each(p, &rib.subs) { +                struct ro_sub * e = list_entry(p, struct ro_sub, next); +                len_s = strlen(e->name); +                len_n = strlen(name); + +                if (len_n < len_s) +                        continue; + +                if (memcmp(name, e->name, len_s) == 0) { +                        if (e->ops->ro_deleted == NULL) +                                continue; + +                        e->ops->ro_deleted(name); +                } +        } + +        pthread_mutex_unlock(&rib.subs_lock); +        pthread_mutex_unlock(&rib.ro_lock); + +        if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) { +                LOG_ERR("Failed to send reply to create request."); +                return -1; +        } + +        return 0; +} +  static int ribmgr_cdap_write(struct cdap * instance,                               int           invoke_id,                               char *        name, -                             uint8_t *     data, -                             size_t        len, +                             ro_msg_t *    msg,                               uint32_t      flags)  { -        static_info_msg_t * msg;          int ret = 0; +        struct list_head * p = NULL; +        size_t len_s; +        size_t len_n; +        uint8_t * ro_data; +        struct rnode * node;          (void) flags; -        pthread_rwlock_wrlock(&ipcpi.state_lock); -        if (ipcp_get_state() == IPCP_PENDING_ENROLL && -            strcmp(name, STATIC_INFO) == 0) { -                LOG_DBG("Received static DIF information."); +        pthread_mutex_lock(&rib.ro_lock); -                msg = static_info_msg__unpack(NULL, len, data); -                if (msg == NULL) { -                        ipcp_set_state(IPCP_INIT); -                        pthread_rwlock_unlock(&ipcpi.state_lock); -                        cdap_send_reply(instance, invoke_id, -1, NULL, 0); -                        LOG_ERR("Failed to unpack static info message."); -                        return -1; -                } +        ro_data = malloc(msg->value.len); +        if (ro_data == NULL) { +                pthread_mutex_unlock(&rib.ro_lock); +                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                return -1; +        } +        memcpy(ro_data, msg->value.data, msg->value.len); -                rib.dtc.addr_size = msg->addr_size; -                rib.dtc.cep_id_size = msg->cep_id_size; -                rib.dtc.pdu_length_size = msg->pdu_length_size; -                rib.dtc.seqno_size = msg->seqno_size; -                rib.dtc.has_ttl = msg->has_ttl; -                rib.dtc.has_chk = msg->has_chk; -                rib.dtc.min_pdu_size = msg->min_pdu_size; -                rib.dtc.max_pdu_size = msg->max_pdu_size; +        node = ribmgr_ro_write(name, msg->value.data, msg->value.len); +        if (node == NULL) { +                pthread_mutex_unlock(&rib.ro_lock); +                free(ro_data); +                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                return -1; +        } -                rib.addr_auth = addr_auth_create(msg->addr_auth_type); -                if (rib.addr_auth == NULL) { -                        ipcp_set_state(IPCP_INIT); -                        pthread_rwlock_unlock(&ipcpi.state_lock); -                        cdap_send_reply(instance, invoke_id, -1, NULL, 0); -                        static_info_msg__free_unpacked(msg, NULL); -                        LOG_ERR("Failed to create address authority"); -                        return -1; -                } +        pthread_mutex_lock(&rib.subs_lock); -                rib.address = rib.addr_auth->address(); -                LOG_DBG("IPCP has address %lu", rib.address); +        list_for_each(p, &rib.subs) { +                struct ro_sub * e = list_entry(p, struct ro_sub, next); +                len_s = strlen(e->name); +                len_n = strlen(name); -                if (frct_init()) { -                        ipcp_set_state(IPCP_INIT); -                        pthread_rwlock_unlock(&ipcpi.state_lock); -                        cdap_send_reply(instance, invoke_id, -1, NULL, 0); -                        static_info_msg__free_unpacked(msg, NULL); -                        LOG_ERR("Failed to init FRCT"); -                        return -1; -                } +                if (len_n < len_s) +                        continue; -                static_info_msg__free_unpacked(msg, NULL); -        } else { -                ret = -1; +                if (memcmp(name, e->name, len_s) == 0) { +                        if (e->ops->ro_updated == NULL) +                                continue; + +                        ro_data = malloc(node->len); +                        if (ro_data == NULL) +                                continue; + +                        memcpy(ro_data, node->data, node->len); +                        e->ops->ro_updated(name, ro_data, node->len); +                }          } -        pthread_rwlock_unlock(&ipcpi.state_lock); +        pthread_mutex_unlock(&rib.subs_lock); +        pthread_mutex_unlock(&rib.ro_lock);          if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) {                  LOG_ERR("Failed to send reply to write request."); @@ -378,13 +845,39 @@ static int ribmgr_cdap_write(struct cdap * instance,          return 0;  } +static int ribmgr_enrol_sync(struct cdap * instance, +                             struct rnode * node) +{ +        int ret = 0; + +        if (node != NULL) { +                if (node->props->enrol_sync == true) { +                        ro_msg_t msg = RO_MSG__INIT; + +                        if (ro_msg_create(node, &msg)) { +                                LOG_ERR("Failed to create RO msg."); +                                return -1; +                        } + +                        if (write_ro_msg(instance, &msg, +                                         node->full_name, CDAP_CREATE)) { +                                LOG_ERR("Failed to send RO msg."); +                                return -1; +                        } +                } + +                ret = ribmgr_enrol_sync(instance, node->child); +                if (ret == 0) +                        ret = ribmgr_enrol_sync(instance, node->sibling); +        } + +        return ret; +} +  static int ribmgr_cdap_start(struct cdap * instance,                               int           invoke_id,                               char *        name)  { -        static_info_msg_t stat_info = STATIC_INFO_MSG__INIT; -        uint8_t * data = NULL; -        size_t len = 0;          int iid = 0;          pthread_rwlock_wrlock(&ipcpi.state_lock); @@ -398,57 +891,17 @@ static int ribmgr_cdap_start(struct cdap * instance,                          return -1;                  } -                stat_info.addr_size = rib.dtc.addr_size; -                stat_info.cep_id_size = rib.dtc.cep_id_size; -                stat_info.pdu_length_size = rib.dtc.pdu_length_size; -                stat_info.seqno_size = rib.dtc.seqno_size; -                stat_info.has_ttl = rib.dtc.has_ttl; -                stat_info.has_chk = rib.dtc.has_chk; -                stat_info.min_pdu_size  = rib.dtc.min_pdu_size; -                stat_info.max_pdu_size = rib.dtc.max_pdu_size; -                stat_info.addr_auth_type = rib.addr_auth->type; - -                len = static_info_msg__get_packed_size(&stat_info); -                if (len == 0) { -                        pthread_rwlock_unlock(&ipcpi.state_lock); -                        LOG_ERR("Failed to get size of static information."); -                        return -1; -                } - -                data = malloc(len); -                if (data == NULL) { -                        pthread_rwlock_unlock(&ipcpi.state_lock); -                        LOG_ERR("Failed to allocate memory."); -                        return -1; -                } - -                static_info_msg__pack(&stat_info, data); - -                LOG_DBGF("Sending static info..."); - -                pthread_mutex_lock(&rib.cdap_reqs_lock); - -                iid = cdap_send_request(instance, CDAP_WRITE, -                                        STATIC_INFO, data, len, 0); -                if (iid < 0) { -                        pthread_mutex_unlock(&rib.cdap_reqs_lock); -                        pthread_rwlock_unlock(&ipcpi.state_lock); -                        free(data); -                        LOG_ERR("Failed to send static information."); -                        return -1; -                } +                /* Loop through rtree and send correct objects */ +                LOG_DBGF("Sending ROs that need to be sent on enrolment..."); -                if (cdap_result_wait(instance, CDAP_WRITE, -                                     STATIC_INFO, iid)) { -                        pthread_mutex_unlock(&rib.cdap_reqs_lock); +                pthread_mutex_lock(&rib.ro_lock); +                if (ribmgr_enrol_sync(instance, rib.root->child)) { +                        pthread_mutex_unlock(&rib.ro_lock);                          pthread_rwlock_unlock(&ipcpi.state_lock); -                        free(data); -                        LOG_ERR("Remote did not receive static information."); +                        LOG_ERR("Failed to sync part of the RIB.");                          return -1;                  } -                pthread_mutex_unlock(&rib.cdap_reqs_lock); - -                /* FIXME: Send neighbors here. */ +                pthread_mutex_unlock(&rib.ro_lock);                  LOG_DBGF("Sending stop enrollment..."); @@ -459,7 +912,6 @@ static int ribmgr_cdap_start(struct cdap * instance,                  if (iid < 0) {                          pthread_mutex_unlock(&rib.cdap_reqs_lock);                          pthread_rwlock_unlock(&ipcpi.state_lock); -                        free(data);                          LOG_ERR("Failed to send stop of enrollment.");                          return -1;                  } @@ -468,13 +920,10 @@ static int ribmgr_cdap_start(struct cdap * instance,                                       ENROLLMENT, iid)) {                          pthread_mutex_unlock(&rib.cdap_reqs_lock);                          pthread_rwlock_unlock(&ipcpi.state_lock); -                        free(data);                          LOG_ERR("Remote failed to complete enrollment.");                          return -1;                  }                  pthread_mutex_unlock(&rib.cdap_reqs_lock); - -                free(data);          } else {                  if (cdap_send_reply(instance, invoke_id, -1, NULL, 0)) {                          pthread_rwlock_unlock(&ipcpi.state_lock); @@ -520,27 +969,75 @@ static int ribmgr_cdap_request(struct cdap *    instance,                                 size_t           len,                                 uint32_t         flags)  { -        switch (opcode) { -        case CDAP_WRITE: -                return ribmgr_cdap_write(instance, -                                         invoke_id, -                                         name, data, -                                         len, flags); -        case CDAP_START: +        ro_msg_t * msg; +        int ret = -1; +        struct list_head * p = NULL; + +        if (opcode == CDAP_START)                  return ribmgr_cdap_start(instance,                                           invoke_id,                                           name); -        case CDAP_STOP: +        else if (opcode == CDAP_STOP)                  return ribmgr_cdap_stop(instance,                                          invoke_id,                                          name); -        default: -                LOG_INFO("Unsupported CDAP opcode received."); + +        msg = ro_msg__unpack(NULL, len, data); +        if (msg == NULL) { +                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                LOG_ERR("Failed to unpack RO message");                  return -1;          } + +        /* FIXME: Check if we already received this */ + +        if (opcode == CDAP_CREATE) { +                ret = ribmgr_cdap_create(instance, +                                         invoke_id, +                                         name, +                                         msg); +        } else if (opcode == CDAP_WRITE) { +                ret = ribmgr_cdap_write(instance, +                                        invoke_id, +                                        name, msg, +                                        flags); + +        } else if (opcode == CDAP_DELETE) { +                ret = ribmgr_cdap_delete(instance, +                                         invoke_id, +                                         name); +        } else { +                LOG_INFO("Unsupported opcode received."); +                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                return -1; +        } + +        if (msg->recv_set == ALL_MEMBERS) { +                pthread_rwlock_rdlock(&rib.flows_lock); +                list_for_each(p, &rib.flows) { +                        struct mgmt_flow * e = +                                list_entry(p, struct mgmt_flow, next); + +                        /* Don't send it back */ +                        if (e->instance == instance) +                                continue; + +                        if (write_ro_msg(e->instance, msg, name, opcode)) { +                                LOG_ERR("Failed to send to a neighbor."); +                                pthread_rwlock_unlock(&rib.flows_lock); +                                ro_msg__free_unpacked(msg, NULL); +                                return -1; +                        } +                } +                pthread_rwlock_unlock(&rib.flows_lock); +        } + +        ro_msg__free_unpacked(msg, NULL); + +        return ret;  } -static struct cdap_ops ribmgr_ops = { +static struct cdap_ops ribmgr_cdap_ops = {          .cdap_reply   = ribmgr_cdap_reply,          .cdap_request = ribmgr_cdap_request  }; @@ -555,7 +1052,7 @@ int ribmgr_add_flow(int fd)          if (flow == NULL)                  return -1; -        instance = cdap_create(&ribmgr_ops, fd); +        instance = cdap_create(&ribmgr_cdap_ops, fd);          if (instance == NULL) {                  LOG_ERR("Failed to create CDAP instance");                  free(flow); @@ -630,24 +1127,92 @@ int ribmgr_remove_flow(int fd)  int ribmgr_bootstrap(struct dif_config * conf)  { +        static_info_msg_t stat_info = STATIC_INFO_MSG__INIT; +        uint8_t * data = NULL; +        size_t len = 0; +        struct ro_props * props; +          if (conf == NULL ||              conf->type != IPCP_NORMAL) {                  LOG_ERR("Bad DIF configuration.");                  return -1;          } -        rib.dtc.addr_size = conf->addr_size; -        rib.dtc.cep_id_size  = conf->cep_id_size; -        rib.dtc.pdu_length_size = conf->pdu_length_size; -        rib.dtc.seqno_size = conf->seqno_size; -        rib.dtc.has_ttl = conf->has_ttl; -        rib.dtc.has_chk = conf->has_chk; -        rib.dtc.min_pdu_size = conf->min_pdu_size; -        rib.dtc.max_pdu_size = conf->max_pdu_size; +        props = malloc(sizeof(*props)); +        if (props == NULL) { +                LOG_ERR("Failed to allocate memory."); +                return -1; +        } + +        props->enrol_sync = true; +        props->recv_set = NEIGHBORS; +        props->expiry.tv_sec = 0; +        props->expiry.tv_nsec = 0; + +        if (ribmgr_ro_create(RIBMGR_PREFIX, props, NULL, 0) == NULL) { +                LOG_ERR("Failed to create RIBMGR RO."); +                free(props); +                return -1; +        } + +        stat_info.addr_size = rib.dtc.addr_size = conf->addr_size; +        stat_info.cep_id_size = rib.dtc.cep_id_size  = conf->cep_id_size; +        stat_info.pdu_length_size = rib.dtc.pdu_length_size +                = conf->pdu_length_size; +        stat_info.seqno_size = rib.dtc.seqno_size = conf->seqno_size; +        stat_info.has_ttl = rib.dtc.has_ttl = conf->has_ttl; +        stat_info.has_chk = rib.dtc.has_chk = conf->has_chk; +        stat_info.min_pdu_size = rib.dtc.min_pdu_size = conf->min_pdu_size; +        stat_info.max_pdu_size = rib.dtc.max_pdu_size = conf->max_pdu_size;          rib.addr_auth = addr_auth_create(conf->addr_auth_type);          if (rib.addr_auth == NULL) {                  LOG_ERR("Failed to create address authority."); +                ro_delete(RIBMGR_PREFIX); +                return -1; +        } + +        stat_info.addr_auth_type = rib.addr_auth->type; + +        len = static_info_msg__get_packed_size(&stat_info); +        if (len == 0) { +                LOG_ERR("Failed to get size of static information."); +                addr_auth_destroy(rib.addr_auth); +                ribmgr_ro_delete(RIBMGR_PREFIX); +                return -1; +        } + +        data = malloc(len); +        if (data == NULL) { +                LOG_ERR("Failed to allocate memory."); +                addr_auth_destroy(rib.addr_auth); +                ribmgr_ro_delete(RIBMGR_PREFIX); +                return -1; +        } + +        static_info_msg__pack(&stat_info, data); + +        props = malloc(sizeof(*props)); +        if (props == NULL) { +                LOG_ERR("Failed to allocate memory."); +                free(data); +                addr_auth_destroy(rib.addr_auth); +                ribmgr_ro_delete(RIBMGR_PREFIX); +                return -1; +        } + +        props->enrol_sync = true; +        props->recv_set = NEIGHBORS; +        props->expiry.tv_sec = 0; +        props->expiry.tv_nsec = 0; + +        if (ribmgr_ro_create(RIBMGR_PREFIX STAT_INFO, +                             props, data, len) == NULL) { +                LOG_ERR("Failed to create static info RO."); +                free(data); +                free(props); +                addr_auth_destroy(rib.addr_auth); +                ribmgr_ro_delete(RIBMGR_PREFIX);                  return -1;          } @@ -656,7 +1221,9 @@ int ribmgr_bootstrap(struct dif_config * conf)          if (frct_init()) {                  LOG_ERR("Failed to initialize FRCT."); +                ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO);                  addr_auth_destroy(rib.addr_auth); +                ribmgr_ro_delete(RIBMGR_PREFIX);                  return -1;          } @@ -675,109 +1242,64 @@ uint64_t ribmgr_address()          return rib.address;  } -int ro_create(const char * name, -              uint8_t *    data, -              size_t       len) +static int send_neighbors_ro(char *           name, +                             ro_msg_t *       msg, +                             enum cdap_opcode code)  { -        char * str, * str1, * saveptr, * token; -        struct rnode * node, * new, * prev; -        bool sibling;          struct list_head * p = NULL; -        size_t len_s, len_n; -        uint8_t * ro_data; - -        str = strdup(name); -        if (str == NULL) -                return -1; -        pthread_mutex_lock(&rib.ro_lock); -        node = rib.root; +        pthread_rwlock_rdlock(&rib.flows_lock); +        list_for_each(p, &rib.flows) { +                struct mgmt_flow * e = list_entry(p, struct mgmt_flow, next); -        for (str1 = str; ; str1 = NULL) { -                token = strtok_r(str1, PATH_DELIMITER, &saveptr); -                if (token == NULL) { -                        pthread_mutex_unlock(&rib.ro_lock); -                        LOG_ERR("RO already exists."); -                        free(str); +                if (write_ro_msg(e->instance, msg, name, code)) { +                        LOG_ERR("Failed to send to a neighbor."); +                        pthread_rwlock_unlock(&rib.flows_lock);                          return -1;                  } +        } +        pthread_rwlock_unlock(&rib.flows_lock); -                prev = node; -                node = node->child; -                sibling = false; +        return 0; +} -                /* Search horizontally */ -                while (node != NULL) { -                        if (strcmp(node->name, token) == 0) { -                                break; -                        } else { -                                prev = node; -                                node = node->sibling; -                                sibling = true; -                        } -                } +int ro_create(const char *      name, +              struct ro_props * props, +              uint8_t *         data, +              size_t            len) +{ +        struct rnode * node; +        ro_msg_t msg = RO_MSG__INIT; -                if (node == NULL) -                        break; -        } +        if (name == NULL || props == NULL) +                return -EINVAL; -        token = strtok_r(str1, PATH_DELIMITER, &saveptr); -        if (token != NULL) { +        pthread_mutex_lock(&rib.ro_lock); + +        node = ribmgr_ro_create(name, props, data, len); +        if (node == NULL) {                  pthread_mutex_unlock(&rib.ro_lock); -                LOG_ERR("Part of the pathname does not exist."); -                free(str); +                LOG_ERR("Failed to create RO.");                  return -1;          } -        free(str); - -        new = malloc(sizeof(*new)); -        if (new == NULL) { +        if (node->props->recv_set == NO_SYNC) {                  pthread_mutex_unlock(&rib.ro_lock); -                return -1; +                return 0;          } -        new->name = strdup(token); -        if (new->name == NULL) { +        if (ro_msg_create(node, &msg)) {                  pthread_mutex_unlock(&rib.ro_lock); -                free(new); +                LOG_ERR("Failed to create RO msg.");                  return -1;          } -        if (sibling) -                prev->sibling = new; -        else -                prev->child = new; - -        new->data = data; -        new->len = len; -        new->child = NULL; -        new->sibling = NULL; - -        pthread_mutex_lock(&rib.subs_lock); - -        list_for_each(p, &rib.subs) { -                struct ro_sub * e = list_entry(p, struct ro_sub, next); -                len_s = strlen(e->name); -                len_n = strlen(name); - -                if (len_n < len_s) -                        continue; - -                if (strncmp(name, e->name, len_s) == 0) { -                        if (e->ops->ro_created == NULL) -                                continue; - -                        ro_data = malloc(len); -                        if (ro_data == NULL) -                                continue; - -                        memcpy(ro_data, data, len); -                        e->ops->ro_created(name, ro_data, len); -                } +        if (send_neighbors_ro(node->full_name, &msg, CDAP_CREATE)) { +                pthread_mutex_unlock(&rib.ro_lock); +                LOG_ERR("Failed to send to neighbors."); +                return -1;          } -        pthread_mutex_unlock(&rib.subs_lock);          pthread_mutex_unlock(&rib.ro_lock);          return 0; @@ -785,207 +1307,94 @@ int ro_create(const char * name,  int ro_delete(const char * name)  { -        char * str, * str1, * saveptr, * token; -        struct rnode * node, * prev; -        bool sibling = false; -        struct list_head * p = NULL; -        size_t len_s, len_n; +        struct rnode * node; +        ro_msg_t msg = RO_MSG__INIT; -        str = strdup(name); -        if (str == NULL) -                return -1; +        if (name == NULL) +                return -EINVAL;          pthread_mutex_lock(&rib.ro_lock); -        node = rib.root; -        prev = NULL; - -        for (str1 = str; ; str1 = NULL) { -                token = strtok_r(str1, PATH_DELIMITER, &saveptr); -                if (token == NULL) -                        break; - -                prev = node; -                node = node->child; -                sibling = false; +        node = find_rnode_by_name(name); +        if (node == NULL) { +                pthread_mutex_unlock(&rib.ro_lock); +                LOG_ERR("Failed to sync RO."); +                return -1; +        } -                while (node != NULL) { -                        if (strcmp(node->name, token) == 0) { -                                break; -                        } else { -                                prev = node; -                                node = node->sibling; -                                sibling = true; -                        } +        if (node->props->recv_set != NO_SYNC) { +                if (ro_msg_create(node, &msg)) { +                        pthread_mutex_unlock(&rib.ro_lock); +                        LOG_ERR("Failed to create RO msg."); +                        return -1;                  } -                if (node == NULL) { +                if (send_neighbors_ro(node->full_name, &msg, CDAP_DELETE)) {                          pthread_mutex_unlock(&rib.ro_lock); -                        free(str); +                        LOG_ERR("Failed to send to neighbors.");                          return -1;                  }          } -        if (node == rib.root) { -                LOG_ERR("Won't remove root."); -                free(str); +        if (ribmgr_ro_delete(name)) { +                pthread_mutex_unlock(&rib.ro_lock);                  return -1;          } -        free(node->name); -        if (node->data != NULL) -                free(node->data); - -        if (sibling) -                prev->sibling = node->sibling; -        else -                prev->child = node->sibling; - -        free(node); - -        pthread_mutex_lock(&rib.subs_lock); - -        list_for_each(p, &rib.subs) { -                struct ro_sub * e = list_entry(p, struct ro_sub, next); -                len_s = strlen(e->name); -                len_n = strlen(name); - -                if (len_n < len_s) -                        continue; - -                if (strncmp(name, e->name, len_s) == 0) { -                        if (e->ops->ro_deleted == NULL) -                                continue; - -                        e->ops->ro_deleted(name); -                } -        } - -        pthread_mutex_unlock(&rib.subs_lock);          pthread_mutex_unlock(&rib.ro_lock); -        free(str);          return 0;  } -static struct rnode * find_rnode_by_name(const char * name) +int ro_write(const char * name, +             uint8_t *    data, +             size_t       len)  { -        char * str, * str1, * saveptr, * token;          struct rnode * node; +        ro_msg_t msg = RO_MSG__INIT; -        str = strdup(name); -        if (str == NULL) -                return NULL; - -        node = rib.root; - -        for (str1 = str; ; str1 = NULL) { -                token = strtok_r(str1, PATH_DELIMITER, &saveptr); -                if (token == NULL) -                        break; - -                node = node->child; - -                while (node != NULL) -                        if (strcmp(node->name, token) == 0) -                                break; -                        else -                                node = node->sibling; - -                if (node == NULL) { -                        free(str); -                        return NULL; -                } -        } - -        free(str); -        return node; -} - -ssize_t ro_read(const char * name, -                uint8_t **   data) -{ -        struct rnode * node; -        ssize_t        len; +        if (name == NULL || data == NULL) +                return -EINVAL;          pthread_mutex_lock(&rib.ro_lock); -        node = find_rnode_by_name(name); +        node = ribmgr_ro_write(name, data, len);          if (node == NULL) {                  pthread_mutex_unlock(&rib.ro_lock); +                LOG_ERR("Failed to create RO.");                  return -1;          } -        *data = malloc(node->len); -        if (*data == NULL) { +        if (node->props->recv_set == NO_SYNC) {                  pthread_mutex_unlock(&rib.ro_lock); -                return -1; +                return 0;          } -        memcpy(*data, node->data, node->len); -        len = node->len; - -        pthread_mutex_unlock(&rib.ro_lock); - -        return len; -} - -int ro_write(const char * name, -             uint8_t *    data, -             size_t       len) -{ -        struct rnode * node; -        struct list_head * p = NULL; -        size_t len_s, len_n; -        uint8_t * ro_data; - -        pthread_mutex_lock(&rib.ro_lock); - -        node = find_rnode_by_name(name); -        if (node == NULL) { +        if (ro_msg_create(node, &msg)) {                  pthread_mutex_unlock(&rib.ro_lock); +                LOG_ERR("Failed to create RO msg.");                  return -1;          } -        free(node->data); - -        node->data = data; -        node->len = len; - -        pthread_mutex_lock(&rib.subs_lock); - -        list_for_each(p, &rib.subs) { -                struct ro_sub * e = -                        list_entry(p, struct ro_sub, next); -                len_s = strlen(e->name); -                len_n = strlen(name); - -                if (len_n < len_s) -                        continue; - -                if (strncmp(name, e->name, len_s) == 0) { -                        if (e->ops->ro_updated == NULL) -                                continue; - -                        ro_data = malloc(len); -                        if (ro_data == NULL) -                                continue; - -                        memcpy(ro_data, data, len); -                        e->ops->ro_updated(name, ro_data, len); -                } +        if (send_neighbors_ro(node->full_name, &msg, CDAP_WRITE)) { +                pthread_mutex_unlock(&rib.ro_lock); +                LOG_ERR("Failed to send to neighbors."); +                return -1;          } -        pthread_mutex_unlock(&rib.subs_lock);          pthread_mutex_unlock(&rib.ro_lock);          return 0;  } -int ro_props(const char *      name, -             struct ro_props * props) +ssize_t ro_read(const char * name, +                uint8_t **   data)  {          struct rnode * node; +        ssize_t        len; + +        if (name == NULL || data == NULL) +                return -EINVAL;          pthread_mutex_lock(&rib.ro_lock); @@ -995,27 +1404,18 @@ int ro_props(const char *      name,                  return -1;          } -        if (node->props != NULL) { -                if (node->props->expiry != NULL) -                        free(node->props->expiry); -                free(node->props); +        *data = malloc(node->len); +        if (*data == NULL) { +                pthread_mutex_unlock(&rib.ro_lock); +                return -1;          } -        node->props = props; +        memcpy(*data, node->data, node->len); +        len = node->len;          pthread_mutex_unlock(&rib.ro_lock); -        return 0; -} - -int ro_sync(const char * name) -{ -        (void) name; - -        LOG_MISSING; -        /* FIXME: We need whatevercast sets first */ - -        return -1; +        return len;  }  int ro_subscribe(const char *        name, diff --git a/src/ipcpd/normal/ro.h b/src/ipcpd/normal/ro.h index 0dfa7e8a..6fa1db2a 100644 --- a/src/ipcpd/normal/ro.h +++ b/src/ipcpd/normal/ro.h @@ -24,38 +24,33 @@  #define OUROBOROS_IPCP_RO_H  enum ro_recv_set { -        ALL_MEMBERS = 0, -        NEIGHBORS +        NO_SYNC = 0, +        NEIGHBORS, +        ALL_MEMBERS  };  struct ro_props { -        bool              enrol_sync; -        enum ro_recv_set  recv_set; -        struct timespec * expiry; +        bool             enrol_sync; +        enum ro_recv_set recv_set; +        struct timespec  expiry;  };  /* All RIB-objects have a pathname, separated by a slash. */ -/* Takes ownership of the data */ -int          ro_create(const char * name, -                       uint8_t *    data, -                       size_t       len); +/* Takes ownership of the data and props */ +int          ro_create(const char *      name, +                       struct ro_props * props, +                       uint8_t *         data, +                       size_t            len);  int          ro_delete(const char * name); -/* Reader takes ownership of data */ -ssize_t      ro_read(const char * name, -                     uint8_t **   data); -  int          ro_write(const char * name,                        uint8_t *    data,                        size_t       len); -/* Takes ownership of the props */ -int          ro_props(const char *      name, -                      struct ro_props * props); - -/* Sync changes with other members in the DIF */ -int          ro_sync(const char * name); +/* Reader takes ownership of data */ +ssize_t      ro_read(const char * name, +                     uint8_t **   data);  /* Callback passes ownership of the data */  struct ro_sub_ops { diff --git a/src/ipcpd/normal/ro.proto b/src/ipcpd/normal/ro.proto new file mode 100644 index 00000000..c5062468 --- /dev/null +++ b/src/ipcpd/normal/ro.proto @@ -0,0 +1,9 @@ +message ro_msg { +        required uint64 address    = 1; +        required uint64 seqno      = 2; +        required int32  recv_set   = 3; +        required bool   enrol_sync = 4; +        required uint32 sec        = 5; +        required uint64 nsec       = 6; +        required bytes  value      = 7; +}
\ No newline at end of file | 
