diff options
| author | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-12-24 12:54:16 +0100 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-12-24 12:54:16 +0100 | 
| commit | 7348a7d587adc3cb1bc11ef7513ef7b03bc40349 (patch) | |
| tree | 54c4406c135621d44fd9468c12a6b26e3e5d4f74 /src/ipcpd/normal/ribmgr.c | |
| parent | 55eaed508c9f68b350f29cf2c4e96be4e57b0b37 (diff) | |
| parent | 8910bd28e2b6269f0900c8215352ab5d177a3b54 (diff) | |
| download | ouroboros-7348a7d587adc3cb1bc11ef7513ef7b03bc40349.tar.gz ouroboros-7348a7d587adc3cb1bc11ef7513ef7b03bc40349.zip  | |
Merged in dstaesse/ouroboros/be-normal (pull request #325)
Be normal
Diffstat (limited to 'src/ipcpd/normal/ribmgr.c')
| -rw-r--r-- | src/ipcpd/normal/ribmgr.c | 501 | 
1 files changed, 195 insertions, 306 deletions
diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index b0738a0c..1e9bcc18 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -42,7 +42,6 @@  #include "dt_const.h"  #include "frct.h"  #include "ipcp.h" -#include "cdap_request.h"  #include "ro.h"  #include "path.h"  #include "dir.h" @@ -85,22 +84,28 @@ struct rnode {  };  struct mgmt_flow { +        struct list_head next; +          struct cdap *    instance;          int              fd; -        struct list_head next; + +        pthread_t        handler;  };  struct ro_sub { +        struct list_head    next; +          int                 sid; +          char *              name;          struct ro_sub_ops * ops; -        struct list_head    next;  };  struct ro_id { +        struct list_head next; +          uint64_t         seqno;          char *           full_name; -        struct list_head next;  };  struct { @@ -124,9 +129,6 @@ struct {          struct list_head    flows;          pthread_rwlock_t    flows_lock; -        struct list_head    cdap_reqs; -        pthread_mutex_t     cdap_reqs_lock; -          struct addr_auth *  addr_auth;          enum pol_addr_auth  addr_auth_type;  } rib; @@ -173,7 +175,7 @@ void ribmgr_ro_created(const char * name,          pthread_rwlock_unlock(&ipcpi.state_lock);  } -/* We only have a create operation for now */ +/* We only have a create operation for now. */  static struct ro_sub_ops ribmgr_sub_ops = {          .ro_created = ribmgr_ro_created,          .ro_updated = NULL, @@ -303,9 +305,12 @@ static void ro_delete_timer(void * o)  {          char * name = (char *) o; -        if (ribmgr_ro_delete(name)) { +        pthread_mutex_lock(&rib.ro_lock); + +        if (ribmgr_ro_delete(name))                  LOG_ERR("Failed to delete %s.", name); -        } + +        pthread_mutex_unlock(&rib.ro_lock);  }  static struct rnode * ribmgr_ro_create(const char *   name, @@ -342,7 +347,7 @@ static struct rnode * ribmgr_ro_create(const char *   name,                  node = node->child;                  sibling = false; -                /* Search horizontally */ +                /* Search horizontally. */                  while (node != NULL) {                          if (strcmp(node->name, token) == 0) {                                  break; @@ -400,15 +405,12 @@ static struct rnode * ribmgr_ro_create(const char *   name,          LOG_DBG("Created RO with name %s.", name); -        if (!(attr.expiry.tv_sec == 0 && -              attr.expiry.tv_nsec == 0)) { +        if (!(attr.expiry.tv_sec == 0 && attr.expiry.tv_nsec == 0)) {                  timeout = attr.expiry.tv_sec * 1000 +                          attr.expiry.tv_nsec / MILLION; -                if (timerwheel_add(rib.wheel, ro_delete_timer, -                                   new->full_name, strlen(new->full_name) + 1, -                                   timeout)) { +                if (timerwheel_add(rib.wheel, ro_delete_timer, new->full_name, +                                   strlen(new->full_name) + 1, timeout))                          LOG_ERR("Failed to add deletion timer of RO."); -                }          }          return new; @@ -434,51 +436,6 @@ static struct rnode * ribmgr_ro_write(const char * name,          return node;  } -/* Call while holding cdap_reqs_lock */ -/* FIXME: better not to call blocking functions under any lock */ -int cdap_result_wait(struct cdap * instance, -                     enum cdap_opcode code, -                     char * name, -                     int invoke_id) -{ -        struct cdap_request * req; -        int ret; -        char * name_dup = strdup(name); -        if (name_dup == NULL) -                return -1; - -        req = cdap_request_create(code, name_dup, invoke_id, instance); -        if (req == NULL) { -                free(name_dup); -                return -1; -        } - -        list_add(&req->next, &rib.cdap_reqs); - -        pthread_mutex_unlock(&rib.cdap_reqs_lock); - -        ret = cdap_request_wait(req); - -        pthread_mutex_lock(&rib.cdap_reqs_lock); - -        if (ret == -1)  /* should only be on ipcp shutdown */ -                LOG_DBG("Waiting CDAP request destroyed."); - -        if (ret == -ETIMEDOUT) -                LOG_ERR("CDAP Request timed out."); - -        if (ret) -                LOG_DBG("Unknown error code: %d.", ret); - -        if (!ret) -                ret = req->result; - -        list_del(&req->next); -        cdap_request_destroy(req); - -        return ret; -} -  static int write_ro_msg(struct cdap *    neighbor,                          ro_msg_t *       msg,                          char *           name, @@ -486,7 +443,8 @@ static int write_ro_msg(struct cdap *    neighbor,  {          uint8_t * data;          size_t len; -        int iid = 0; +        cdap_key_t key; +        int ret;          len = ro_msg__get_packed_size(msg);          if (len == 0) @@ -498,23 +456,21 @@ static int write_ro_msg(struct cdap *    neighbor,          ro_msg__pack(msg, data); -        pthread_mutex_lock(&rib.cdap_reqs_lock); -        iid = cdap_send_request(neighbor, code, -                                name, data, len, 0); -        if (iid < 0) { -                pthread_mutex_unlock(&rib.cdap_reqs_lock); +        key = cdap_request_send(neighbor, code, name, data, len, 0); +        if (key < 0) { +                LOG_ERR("Failed to send CDAP request.");                  free(data);                  return -1;          } -        if (cdap_result_wait(neighbor, code, name, iid)) { -                pthread_mutex_unlock(&rib.cdap_reqs_lock); -                free(data); -                LOG_ERR("Remote did not receive RIB object."); +        free(data); + +        ret = cdap_reply_wait(neighbor, key, NULL, NULL); +        if (ret < 0) { +                LOG_ERR("CDAP command with code %d and name %s failed:  %d.", +                        code, name, ret);                  return -1;          } -        pthread_mutex_unlock(&rib.cdap_reqs_lock); -        free(data);          return 0;  } @@ -522,7 +478,6 @@ static int write_ro_msg(struct cdap *    neighbor,  int ribmgr_init()  {          INIT_LIST_HEAD(&rib.flows); -        INIT_LIST_HEAD(&rib.cdap_reqs);          INIT_LIST_HEAD(&rib.subs);          INIT_LIST_HEAD(&rib.ro_ids); @@ -540,17 +495,9 @@ int ribmgr_init()                  return -1;          } -        if (pthread_mutex_init(&rib.cdap_reqs_lock, NULL)) { -                LOG_ERR("Failed to initialize mutex."); -                pthread_rwlock_destroy(&rib.flows_lock); -                free(rib.root); -                return -1; -        } -          if (pthread_mutex_init(&rib.ro_lock, NULL)) {                  LOG_ERR("Failed to initialize mutex.");                  pthread_rwlock_destroy(&rib.flows_lock); -                pthread_mutex_destroy(&rib.cdap_reqs_lock);                  free(rib.root);                  return -1;          } @@ -558,7 +505,6 @@ int ribmgr_init()          if (pthread_mutex_init(&rib.subs_lock, NULL)) {                  LOG_ERR("Failed to initialize mutex.");                  pthread_rwlock_destroy(&rib.flows_lock); -                pthread_mutex_destroy(&rib.cdap_reqs_lock);                  pthread_mutex_destroy(&rib.ro_lock);                  free(rib.root);                  return -1; @@ -567,7 +513,6 @@ int ribmgr_init()          if (pthread_mutex_init(&rib.ro_ids_lock, NULL)) {                  LOG_ERR("Failed to initialize mutex.");                  pthread_rwlock_destroy(&rib.flows_lock); -                pthread_mutex_destroy(&rib.cdap_reqs_lock);                  pthread_mutex_destroy(&rib.ro_lock);                  pthread_mutex_destroy(&rib.subs_lock);                  free(rib.root); @@ -578,7 +523,6 @@ int ribmgr_init()          if (rib.sids == NULL) {                  LOG_ERR("Failed to create bitmap.");                  pthread_rwlock_destroy(&rib.flows_lock); -                pthread_mutex_destroy(&rib.cdap_reqs_lock);                  pthread_mutex_destroy(&rib.ro_lock);                  pthread_mutex_destroy(&rib.subs_lock);                  pthread_mutex_destroy(&rib.ro_ids_lock); @@ -591,7 +535,6 @@ int ribmgr_init()                  LOG_ERR("Failed to create timerwheel.");                  bmp_destroy(rib.sids);                  pthread_rwlock_destroy(&rib.flows_lock); -                pthread_mutex_destroy(&rib.cdap_reqs_lock);                  pthread_mutex_destroy(&rib.ro_lock);                  pthread_mutex_destroy(&rib.subs_lock);                  pthread_mutex_destroy(&rib.ro_ids_lock); @@ -605,7 +548,6 @@ int ribmgr_init()                  timerwheel_destroy(rib.wheel);                  bmp_destroy(rib.sids);                  pthread_rwlock_destroy(&rib.flows_lock); -                pthread_mutex_destroy(&rib.cdap_reqs_lock);                  pthread_mutex_destroy(&rib.ro_lock);                  pthread_mutex_destroy(&rib.subs_lock);                  pthread_mutex_destroy(&rib.ro_ids_lock); @@ -633,16 +575,6 @@ int ribmgr_fini()          struct list_head * pos = NULL;          struct list_head * n = NULL; -        pthread_mutex_lock(&rib.cdap_reqs_lock); -        list_for_each_safe(pos, n, &rib.cdap_reqs) { -                struct cdap_request * req = -                        list_entry(pos, struct cdap_request, next); -                free(req->name); -                list_del(&req->next); -                free(req); -        } -        pthread_mutex_unlock(&rib.cdap_reqs_lock); -          pthread_rwlock_wrlock(&rib.flows_lock);          list_for_each_safe(pos, n, &rib.flows) {                  struct mgmt_flow * flow = @@ -668,7 +600,6 @@ int ribmgr_fini()          timerwheel_destroy(rib.wheel);          pthread_mutex_destroy(&rib.subs_lock); -        pthread_mutex_destroy(&rib.cdap_reqs_lock);          pthread_mutex_destroy(&rib.ro_lock);          pthread_rwlock_destroy(&rib.flows_lock);          pthread_mutex_destroy(&rib.ro_ids_lock); @@ -676,49 +607,8 @@ int ribmgr_fini()          return 0;  } -static int ribmgr_cdap_reply(struct cdap * instance, -                             int           invoke_id, -                             int           result, -                             uint8_t *     data, -                             size_t        len) -{ -        struct list_head * pos, * n = NULL; - -        /* We never perform reads on other RIBs */ -        (void) data; -        (void) len; - -        pthread_mutex_lock(&rib.cdap_reqs_lock); - -        list_for_each_safe(pos, n, &rib.cdap_reqs) { -                struct cdap_request * req = -                        list_entry(pos, struct cdap_request, next); -                if (req->instance == instance && -                    req->invoke_id == invoke_id && -                    req->state == REQ_PENDING) { -                        if (result != 0) -                                LOG_ERR("CDAP command with code %d and name %s " -                                        "failed with error %d", -                                        req->code, req->name, result); -                        else -                                LOG_DBG("CDAP command with code %d and name %s " -                                        "executed succesfully", -                                        req->code, req->name); - -                        pthread_mutex_unlock(&rib.cdap_reqs_lock); - -                        cdap_request_respond(req, result); - -                        pthread_mutex_lock(&rib.cdap_reqs_lock); -                } -        } -        pthread_mutex_unlock(&rib.cdap_reqs_lock); - -        return 0; -} -  static int ribmgr_cdap_create(struct cdap * instance, -                              int           invoke_id, +                              cdap_key_t    key,                                char *        name,                                ro_msg_t *    msg)  { @@ -729,6 +619,8 @@ static int ribmgr_cdap_create(struct cdap * instance,          struct ro_attr attr;          struct rnode * node; +        assert(instance); +          ro_attr_init(&attr);          attr.expiry.tv_sec = msg->sec;          attr.expiry.tv_nsec = msg->nsec; @@ -740,7 +632,7 @@ static int ribmgr_cdap_create(struct cdap * instance,          ro_data = malloc(msg->value.len);          if (ro_data == NULL) {                  pthread_mutex_unlock(&rib.ro_lock); -                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                cdap_reply_send(instance, key, -1, NULL, 0);                  return -1;          }          memcpy(ro_data, msg->value.data, msg->value.len); @@ -748,7 +640,7 @@ static int ribmgr_cdap_create(struct cdap * instance,          node = ribmgr_ro_create(name, attr, ro_data, msg->value.len);          if (node == NULL) {                  pthread_mutex_unlock(&rib.ro_lock); -                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                cdap_reply_send(instance, key, -1, NULL, 0);                  free(ro_data);                  return -1;          } @@ -778,7 +670,7 @@ static int ribmgr_cdap_create(struct cdap * instance,          pthread_mutex_unlock(&rib.subs_lock);          pthread_mutex_unlock(&rib.ro_lock); -        if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { +        if (cdap_reply_send(instance, key, ret, NULL, 0)) {                  LOG_ERR("Failed to send reply to create request.");                  return -1;          } @@ -787,7 +679,7 @@ static int ribmgr_cdap_create(struct cdap * instance,  }  static int ribmgr_cdap_delete(struct cdap * instance, -                              int           invoke_id, +                              cdap_key_t    key,                                char *        name)  {          struct list_head * p = NULL; @@ -798,7 +690,7 @@ static int ribmgr_cdap_delete(struct cdap * instance,          if (ribmgr_ro_delete(name)) {                  pthread_mutex_unlock(&rib.ro_lock); -                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                cdap_reply_send(instance, key, -1, NULL, 0);                  return -1;          } @@ -823,7 +715,7 @@ static int ribmgr_cdap_delete(struct cdap * instance,          pthread_mutex_unlock(&rib.subs_lock);          pthread_mutex_unlock(&rib.ro_lock); -        if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) { +        if (cdap_reply_send(instance, key, 0, NULL, 0)) {                  LOG_ERR("Failed to send reply to create request.");                  return -1;          } @@ -832,7 +724,7 @@ static int ribmgr_cdap_delete(struct cdap * instance,  }  static int ribmgr_cdap_write(struct cdap * instance, -                             int           invoke_id, +                             cdap_key_t    key,                               char *        name,                               ro_msg_t *    msg,                               uint32_t      flags) @@ -851,7 +743,7 @@ static int ribmgr_cdap_write(struct cdap * instance,          ro_data = malloc(msg->value.len);          if (ro_data == NULL) {                  pthread_mutex_unlock(&rib.ro_lock); -                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                cdap_reply_send(instance, key, -1, NULL, 0);                  return -1;          }          memcpy(ro_data, msg->value.data, msg->value.len); @@ -860,7 +752,7 @@ static int ribmgr_cdap_write(struct cdap * instance,          if (node == NULL) {                  pthread_mutex_unlock(&rib.ro_lock);                  free(ro_data); -                cdap_send_reply(instance, invoke_id, -1, NULL, 0); +                cdap_reply_send(instance, key, -1, NULL, 0);                  return -1;          }          node->seqno = msg->seqno; @@ -891,7 +783,7 @@ static int ribmgr_cdap_write(struct cdap * instance,          pthread_mutex_unlock(&rib.subs_lock);          pthread_mutex_unlock(&rib.ro_lock); -        if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { +        if (cdap_reply_send(instance, key, ret, NULL, 0)) {                  LOG_ERR("Failed to send reply to write request.");                  return -1;          } @@ -899,8 +791,7 @@ static int ribmgr_cdap_write(struct cdap * instance,          return 0;  } -static int ribmgr_enrol_sync(struct cdap * instance, -                             struct rnode * node) +static int ribmgr_enrol_sync(struct cdap * instance, struct rnode * node)  {          int ret = 0; @@ -931,24 +822,28 @@ static int ribmgr_enrol_sync(struct cdap * instance,  }  static int ribmgr_cdap_start(struct cdap * instance, -                             int           invoke_id, +                             cdap_key_t    key,                               char *        name)  { -        int iid = 0; - -        pthread_rwlock_wrlock(&ipcpi.state_lock); -        if (ipcp_get_state() == IPCP_OPERATIONAL && -            strcmp(name, ENROLLMENT) == 0) { +        if (strcmp(name, ENROLLMENT) == 0) {                  LOG_DBG("New enrollment request."); -                if (cdap_send_reply(instance, invoke_id, 0, NULL, 0)) { +                pthread_rwlock_wrlock(&ipcpi.state_lock); + +                if (ipcp_get_state() != IPCP_OPERATIONAL) { +                        pthread_rwlock_unlock(&ipcpi.state_lock); +                        LOG_ERR("IPCP in wrong state."); +                        return -1; +                } + +                if (cdap_reply_send(instance, key, 0, NULL, 0)) {                          pthread_rwlock_unlock(&ipcpi.state_lock);                          LOG_ERR("Failed to send reply to enrollment request.");                          return -1;                  } -                /* Loop through rtree and send correct objects */ -                LOG_DBGF("Sending ROs that need to be sent on enrolment..."); +                /* Loop through rtree and send correct objects. */ +                LOG_DBG("Sending ROs that need to be sent on enrolment...");                  pthread_mutex_lock(&rib.ro_lock);                  if (ribmgr_enrol_sync(instance, rib.root->child)) { @@ -957,57 +852,48 @@ static int ribmgr_cdap_start(struct cdap * instance,                          LOG_ERR("Failed to sync part of the RIB.");                          return -1;                  } +                  pthread_mutex_unlock(&rib.ro_lock);                  LOG_DBGF("Sending stop enrollment..."); -                pthread_mutex_lock(&rib.cdap_reqs_lock); - -                iid = cdap_send_request(instance, CDAP_STOP, ENROLLMENT, +                key = cdap_request_send(instance, CDAP_STOP, ENROLLMENT,                                          NULL, 0, 0); -                if (iid < 0) { -                        pthread_mutex_unlock(&rib.cdap_reqs_lock); +                if (key < 0) {                          pthread_rwlock_unlock(&ipcpi.state_lock);                          LOG_ERR("Failed to send stop of enrollment.");                          return -1;                  } -                if (cdap_result_wait(instance, CDAP_STOP, -                                     ENROLLMENT, iid)) { -                        pthread_mutex_unlock(&rib.cdap_reqs_lock); +                if (cdap_reply_wait(instance, key, NULL, NULL)) {                          pthread_rwlock_unlock(&ipcpi.state_lock);                          LOG_ERR("Remote failed to complete enrollment.");                          return -1;                  } -                pthread_mutex_unlock(&rib.cdap_reqs_lock); + +                pthread_rwlock_unlock(&ipcpi.state_lock);          } else { -                if (cdap_send_reply(instance, invoke_id, -1, NULL, 0)) { -                        pthread_rwlock_unlock(&ipcpi.state_lock); -                        LOG_ERR("Failed to send reply to start request."); -                        return -1; -                } +                LOG_WARN("Request to start unknown operation."); +                if (cdap_reply_send(instance, key, -1, NULL, 0)) +                        LOG_ERR("Failed to send negative reply.");          } -        pthread_rwlock_unlock(&ipcpi.state_lock);          return 0;  } -static int ribmgr_cdap_stop(struct cdap * instance, -                            int           invoke_id, -                            char *        name) +static int ribmgr_cdap_stop(struct cdap * instance, cdap_key_t key, char * name)  {          int ret = 0;          pthread_rwlock_wrlock(&ipcpi.state_lock); -        if (ipcp_get_state() == IPCP_CONFIG && -            strcmp(name, ENROLLMENT) == 0) { +        if (ipcp_get_state() == IPCP_CONFIG && strcmp(name, ENROLLMENT) == 0) {                  LOG_DBG("Stop enrollment received.");                  ipcp_set_state(IPCP_BOOTING);          } else                  ret = -1; -        if (cdap_send_reply(instance, invoke_id, ret, NULL, 0)) { +        if (cdap_reply_send(instance, key, ret, NULL, 0)) {                  pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("Failed to send reply to stop request.");                  return -1; @@ -1028,8 +914,7 @@ static void ro_id_delete(void * o)          pthread_mutex_unlock(&rib.ro_ids_lock);  } -static int ro_id_create(char *     name, -                        ro_msg_t * msg) +static int ro_id_create(char * name, ro_msg_t * msg)  {          struct ro_id * tmp; @@ -1062,105 +947,113 @@ static int ro_id_create(char *     name,          return 0;  } -static int ribmgr_cdap_request(struct cdap *    instance, -                               int              invoke_id, -                               enum cdap_opcode opcode, -                               char *           name, -                               uint8_t *        data, -                               size_t           len, -                               uint32_t         flags) +static void * cdap_req_handler(void * o)  { +        struct cdap * instance = (struct cdap *) o; +        enum cdap_opcode opcode; +        char * name; +        uint8_t * data; +        size_t len; +        uint32_t flags;          ro_msg_t * msg; -        int ret = -1;          struct list_head * p = NULL; -        if (opcode == CDAP_START) -                return ribmgr_cdap_start(instance, -                                         invoke_id, -                                         name); -        else if (opcode == CDAP_STOP) -                return ribmgr_cdap_stop(instance, -                                        invoke_id, -                                        name); - -        msg = ro_msg__unpack(NULL, len, data); -        if (msg == NULL) { -                cdap_send_reply(instance, invoke_id, -1, NULL, 0); -                LOG_ERR("Failed to unpack RO message"); -                return -1; -        } +        assert(instance); -        pthread_mutex_lock(&rib.ro_ids_lock); -        list_for_each(p, &rib.ro_ids) { -                struct ro_id * e = list_entry(p, struct ro_id, next); +        while (true) { +                cdap_key_t key = cdap_request_wait(instance, +                                                   &opcode, +                                                   &name, +                                                   &data, +                                                   &len, +                                                   &flags); +                assert(key >= 0); -                if (strcmp(e->full_name, name) == 0 && -                    e->seqno == msg->seqno) { -                        pthread_mutex_unlock(&rib.ro_ids_lock); -                        ro_msg__free_unpacked(msg, NULL); -                        cdap_send_reply(instance, invoke_id, 0, NULL, 0); -                        LOG_DBG("Already received this RO."); -                        return 0; +                if (opcode == CDAP_START) { +                        if (ribmgr_cdap_start(instance, key, name)) +                                LOG_WARN("CDAP start failed."); +                        continue; +                } +                else if (opcode == CDAP_STOP) { +                        if (ribmgr_cdap_stop(instance, key, name)) +                                LOG_WARN("CDAP stop failed."); +                        continue;                  } -        } -        pthread_mutex_unlock(&rib.ro_ids_lock); - -        if (opcode == CDAP_CREATE) { -                ret = ribmgr_cdap_create(instance, -                                         invoke_id, -                                         name, -                                         msg); -        } else if (opcode == CDAP_WRITE) { -                ret = ribmgr_cdap_write(instance, -                                        invoke_id, -                                        name, msg, -                                        flags); - -        } else if (opcode == CDAP_DELETE) { -                ret = ribmgr_cdap_delete(instance, -                                         invoke_id, -                                         name); -        } else { -                LOG_INFO("Unsupported opcode received."); -                ro_msg__free_unpacked(msg, NULL); -                cdap_send_reply(instance, invoke_id, -1, NULL, 0); -                return -1; -        } -        if (ro_id_create(name, msg)) { -                LOG_ERR("Failed to create RO id."); -                return -1; -        } +                msg = ro_msg__unpack(NULL, len, data); +                if (msg == NULL) { +                        cdap_reply_send(instance, key, -1, NULL, 0); +                        LOG_WARN("Failed to unpack RO message"); +                        continue; +                } -        if (msg->recv_set == ALL_MEMBERS) { -                pthread_rwlock_rdlock(&rib.flows_lock); -                list_for_each(p, &rib.flows) { -                        struct mgmt_flow * e = -                                list_entry(p, struct mgmt_flow, next); +                pthread_mutex_lock(&rib.ro_ids_lock); +                list_for_each(p, &rib.ro_ids) { +                        struct ro_id * e = list_entry(p, struct ro_id, next); -                        /* Don't send it back */ -                        if (e->instance == instance) +                        if (strcmp(e->full_name, name) == 0 && +                            e->seqno == msg->seqno) { +                                pthread_mutex_unlock(&rib.ro_ids_lock); +                                ro_msg__free_unpacked(msg, NULL); +                                cdap_reply_send(instance, key, 0, NULL, 0); +                                LOG_DBG("Already received this RO.");                                  continue; +                        } +                } +                pthread_mutex_unlock(&rib.ro_ids_lock); -                        if (write_ro_msg(e->instance, msg, name, opcode)) { -                                LOG_ERR("Failed to send to a neighbor."); -                                pthread_rwlock_unlock(&rib.flows_lock); +                if (opcode == CDAP_CREATE) { +                        if (ribmgr_cdap_create(instance, key, name, msg)) { +                                LOG_WARN("CDAP create failed.");                                  ro_msg__free_unpacked(msg, NULL); -                                return -1; +                                continue;                          } +                } else if (opcode == CDAP_WRITE) { +                        if (ribmgr_cdap_write(instance, key, name, +                                              msg, flags)) { +                                LOG_WARN("CDAP write failed."); +                                ro_msg__free_unpacked(msg, NULL); +                                continue; +                        } +                } else if (opcode == CDAP_DELETE) { +                        if (ribmgr_cdap_delete(instance, key, name)) { +                                LOG_WARN("CDAP delete failed."); +                                ro_msg__free_unpacked(msg, NULL); +                                continue; +                        } +                } else { +                        LOG_INFO("Unsupported opcode received."); +                        ro_msg__free_unpacked(msg, NULL); +                        cdap_reply_send(instance, key, -1, NULL, 0); +                        continue;                  } -                pthread_rwlock_unlock(&rib.flows_lock); -        } -        ro_msg__free_unpacked(msg, NULL); +                if (ro_id_create(name, msg)) { +                        LOG_WARN("Failed to create RO id."); +                        ro_msg__free_unpacked(msg, NULL); +                        continue; +                } -        return ret; -} +                if (msg->recv_set == ALL_MEMBERS) { +                        pthread_rwlock_rdlock(&rib.flows_lock); +                        list_for_each(p, &rib.flows) { +                                struct mgmt_flow * e = +                                        list_entry(p, struct mgmt_flow, next); -static struct cdap_ops ribmgr_cdap_ops = { -        .cdap_reply   = ribmgr_cdap_reply, -        .cdap_request = ribmgr_cdap_request -}; +                                /* Don't send it back. */ +                                if (e->instance == instance) +                                        continue; + +                                if (write_ro_msg(e->instance, msg, +                                                 name, opcode)) +                                        LOG_WARN("Failed to send to neighbor."); +                        } +                        pthread_rwlock_unlock(&rib.flows_lock); +                } + +                ro_msg__free_unpacked(msg, NULL); +        } +}  int ribmgr_add_flow(int fd)  { @@ -1169,9 +1062,9 @@ int ribmgr_add_flow(int fd)          flow = malloc(sizeof(*flow));          if (flow == NULL) -                return -1; +                return -ENOMEM; -        instance = cdap_create(&ribmgr_cdap_ops, fd); +        instance = cdap_create(fd);          if (instance == NULL) {                  LOG_ERR("Failed to create CDAP instance");                  free(flow); @@ -1182,8 +1075,17 @@ int ribmgr_add_flow(int fd)          flow->instance = instance;          flow->fd = fd; +        if (pthread_create(&flow->handler, NULL, +                           cdap_req_handler, instance)) { +                LOG_ERR("Failed to start handler thread for mgt flow."); +                free(flow); +                return -1; +        } +          pthread_rwlock_wrlock(&rib.flows_lock); +          list_add(&flow->next, &rib.flows); +          pthread_rwlock_unlock(&rib.flows_lock);          return 0; @@ -1198,6 +1100,7 @@ int ribmgr_remove_flow(int fd)                  struct mgmt_flow * flow =                          list_entry(pos, struct mgmt_flow, next);                  if (flow->fd == fd) { +                        pthread_cancel(flow->handler);                          if (cdap_destroy(flow->instance))                                  LOG_ERR("Failed to destroy CDAP instance.");                          list_del(&flow->next); @@ -1218,10 +1121,9 @@ int ribmgr_bootstrap(struct dif_config * conf)          size_t len = 0;          struct ro_attr attr; -        if (conf == NULL || -            conf->type != IPCP_NORMAL) { +        if (conf == NULL || conf->type != IPCP_NORMAL) {                  LOG_ERR("Bad DIF configuration."); -                return -1; +                return -EINVAL;          }          ro_attr_init(&attr); @@ -1246,7 +1148,6 @@ int ribmgr_bootstrap(struct dif_config * conf)          len = static_info_msg__get_packed_size(&stat_info);          if (len == 0) {                  LOG_ERR("Failed to get size of static information."); -                addr_auth_destroy(rib.addr_auth);                  ribmgr_ro_delete(RIBMGR_PREFIX);                  return -1;          } @@ -1254,7 +1155,6 @@ int ribmgr_bootstrap(struct dif_config * conf)          data = malloc(len);          if (data == NULL) {                  LOG_ERR("Failed to allocate memory."); -                addr_auth_destroy(rib.addr_auth);                  ribmgr_ro_delete(RIBMGR_PREFIX);                  return -1;          } @@ -1265,7 +1165,6 @@ int ribmgr_bootstrap(struct dif_config * conf)                               attr, data, len) == NULL) {                  LOG_ERR("Failed to create static info RO.");                  free(data); -                addr_auth_destroy(rib.addr_auth);                  ribmgr_ro_delete(RIBMGR_PREFIX);                  return -1;          } @@ -1273,7 +1172,6 @@ int ribmgr_bootstrap(struct dif_config * conf)          if (dir_init()) {                  LOG_ERR("Failed to init directory");                  ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO); -                addr_auth_destroy(rib.addr_auth);                  ribmgr_ro_delete(RIBMGR_PREFIX);                  return -1;          } @@ -1282,7 +1180,6 @@ int ribmgr_bootstrap(struct dif_config * conf)                  LOG_ERR("Failed to initialize FRCT.");                  dir_fini();                  ribmgr_ro_delete(RIBMGR_PREFIX STAT_INFO); -                addr_auth_destroy(rib.addr_auth);                  ribmgr_ro_delete(RIBMGR_PREFIX);                  return -1;          } @@ -1296,12 +1193,14 @@ int ribmgr_enrol(void)  {          struct cdap * instance = NULL;          struct mgmt_flow * flow; -        int iid = 0; +        cdap_key_t key; +        int ret;          pthread_rwlock_wrlock(&ipcpi.state_lock);          if (ipcp_get_state() != IPCP_INIT) {                  pthread_rwlock_unlock(&ipcpi.state_lock); +                LOG_ERR("IPCP in wrong state.");                  return -1;          } @@ -1312,36 +1211,31 @@ int ribmgr_enrol(void)                  ipcp_set_state(IPCP_INIT);                  pthread_rwlock_unlock(&rib.flows_lock);                  pthread_rwlock_unlock(&ipcpi.state_lock); +                LOG_ERR("No flows in RIB.");                  return -1;          } -        flow = list_entry((&rib.flows)->next, struct mgmt_flow, next); +        flow = list_first_entry((&rib.flows), struct mgmt_flow, next);          instance = flow->instance; -        pthread_mutex_lock(&rib.cdap_reqs_lock); -        iid = cdap_send_request(instance, -                                CDAP_START, -                                ENROLLMENT, -                                NULL, 0, 0); -        if (iid < 0) { +        key = cdap_request_send(instance, CDAP_START, ENROLLMENT, NULL, 0, 0); +        if (key < 0) {                  ipcp_set_state(IPCP_INIT); -                pthread_mutex_unlock(&rib.cdap_reqs_lock);                  pthread_rwlock_unlock(&rib.flows_lock);                  pthread_rwlock_unlock(&ipcpi.state_lock);                  LOG_ERR("Failed to start enrollment.");                  return -1;          } -        if (cdap_result_wait(instance, CDAP_START, -                             ENROLLMENT, iid)) { +        ret = cdap_reply_wait(instance, key, NULL, NULL); +        if (ret) {                  ipcp_set_state(IPCP_INIT); -                pthread_mutex_unlock(&rib.cdap_reqs_lock);                  pthread_rwlock_unlock(&rib.flows_lock);                  pthread_rwlock_unlock(&ipcpi.state_lock); -                LOG_ERR("Failed to start enrollment."); +                LOG_ERR("Failed to enroll: %d.", ret);                  return -1;          } -        pthread_mutex_unlock(&rib.cdap_reqs_lock); +          pthread_rwlock_unlock(&rib.flows_lock);          pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1351,9 +1245,10 @@ int ribmgr_enrol(void)  int ribmgr_start_policies(void)  {          pthread_rwlock_rdlock(&ipcpi.state_lock); +          if (ipcp_get_state() != IPCP_BOOTING) {                  pthread_rwlock_unlock(&ipcpi.state_lock); -                LOG_ERR("Cannot start policies in wrong state"); +                LOG_ERR("Cannot start policies in wrong state.");                  return -1;          }          pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1365,7 +1260,7 @@ int ribmgr_start_policies(void)          }          rib.address = rib.addr_auth->address(); -        LOG_DBG("IPCP has address %lu", (unsigned long) rib.address); +        LOG_DBG("IPCP has address %lu.", (unsigned long) rib.address);          return 0;  } @@ -1380,22 +1275,21 @@ uint64_t ribmgr_address()          return rib.address;  } -static int send_neighbors_ro(char *           name, -                             ro_msg_t *       msg, -                             enum cdap_opcode code) +static int send_neighbors_ro(char * name, ro_msg_t * msg, enum cdap_opcode code)  {          struct list_head * p = NULL;          pthread_rwlock_rdlock(&rib.flows_lock); +          list_for_each(p, &rib.flows) {                  struct mgmt_flow * e = list_entry(p, struct mgmt_flow, next); -                  if (write_ro_msg(e->instance, msg, name, code)) { -                        LOG_ERR("Failed to send to a neighbor.");                          pthread_rwlock_unlock(&rib.flows_lock); +                        LOG_ERR("Failed to send to a neighbor.");                          return -1;                  }          } +          pthread_rwlock_unlock(&rib.flows_lock);          return 0; @@ -1499,9 +1393,7 @@ int ro_delete(const char * name)          return 0;  } -int ro_write(const char * name, -             uint8_t *    data, -             size_t       len) +int ro_write(const char * name, uint8_t * data, size_t len)  {          struct rnode * node;          ro_msg_t msg = RO_MSG__INIT; @@ -1541,8 +1433,7 @@ int ro_write(const char * name,          return 0;  } -ssize_t ro_read(const char * name, -                uint8_t **   data) +ssize_t ro_read(const char * name, uint8_t ** data)  {          struct rnode * node;          ssize_t        len; @@ -1572,8 +1463,7 @@ ssize_t ro_read(const char * name,          return len;  } -ssize_t ro_children(const char * name, -                    char ***     children) +ssize_t ro_children(const char * name, char *** children)  {          struct rnode * node;          struct rnode * child; @@ -1640,8 +1530,7 @@ bool ro_exists(const char * name)          return found;  } -int ro_subscribe(const char *        name, -                 struct ro_sub_ops * ops) +int ro_subscribe(const char * name, struct ro_sub_ops * ops)  {          struct ro_sub * sub;          int sid;  | 
