diff options
| author | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-11-28 15:24:26 +0100 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-11-28 15:30:32 +0100 | 
| commit | 8dc6e2ad8a9ab356be2d746dd680a4113fb0bcbc (patch) | |
| tree | 1c4593179929d735331424a60ff56e9a00f1c499 /src/ipcpd/normal | |
| parent | 356da57dbd882a2e9a380c4d8b60f132f41cc593 (diff) | |
| download | ouroboros-8dc6e2ad8a9ab356be2d746dd680a4113fb0bcbc.tar.gz ouroboros-8dc6e2ad8a9ab356be2d746dd680a4113fb0bcbc.zip | |
ipcpd: normal: Add timerwheel to RIB manager
The RIB manager now keeps track of ROs it has already received. The
identification of a RO it knows is kept in a struct ro id. For the
deletion of these RO ids, a timerwheel to the RIB manager. It also
deletes ROs if they have a specified timeout.
Diffstat (limited to 'src/ipcpd/normal')
| -rw-r--r-- | src/ipcpd/normal/ribmgr.c | 300 | 
1 files changed, 219 insertions, 81 deletions
| diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c index 49971eda..524c5a39 100644 --- a/src/ipcpd/normal/ribmgr.c +++ b/src/ipcpd/normal/ribmgr.c @@ -36,6 +36,7 @@  #include <string.h>  #include <errno.h> +#include "timerwheel.h"  #include "addr_auth.h"  #include "ribmgr.h"  #include "dt_const.h" @@ -50,13 +51,16 @@ typedef StaticInfoMsg static_info_msg_t;  #include "ro.pb-c.h"  typedef RoMsg ro_msg_t; -#define SUBS_SIZE 25 +#define SUBS_SIZE        25 +#define WHEEL_RESOLUTION 1000 /* ms */ +#define WHEEL_DELAY      3600000 /* ms */ +#define RO_ID_TIMEOUT    1000 /* ms */ -#define ENROLLMENT     "enrollment" +#define ENROLLMENT       "enrollment" -#define RIBMGR_PREFIX  "/ribmgr" -#define STAT_INFO      "/statinfo" -#define PATH_DELIMITER "/" +#define RIBMGR_PREFIX    "/ribmgr" +#define STAT_INFO        "/statinfo" +#define PATH_DELIMITER   "/"  /* RIB objects */  struct rnode { @@ -92,26 +96,37 @@ struct ro_sub {          struct list_head    next;  }; +struct ro_id { +        uint64_t         seqno; +        char *           full_name; +        struct list_head next; +}; +  struct { -        struct rnode *     root; -        pthread_mutex_t    ro_lock; +        struct rnode *      root; +        pthread_mutex_t     ro_lock; -        struct list_head   subs; -        struct bmp *       sids; -        pthread_mutex_t    subs_lock; -        int                ribmgr_sid; +        struct list_head    subs; +        struct bmp *        sids; +        pthread_mutex_t     subs_lock; +        int                 ribmgr_sid; -        struct dt_const    dtc; +        struct dt_const     dtc; -        uint64_t           address; +        uint64_t            address; -        struct list_head   flows; -        pthread_rwlock_t   flows_lock; +        struct timerwheel * wheel; -        struct list_head   cdap_reqs; -        pthread_mutex_t    cdap_reqs_lock; +        struct list_head    ro_ids; +        pthread_mutex_t     ro_ids_lock; -        struct addr_auth * addr_auth; +        struct list_head    flows; +        pthread_rwlock_t    flows_lock; + +        struct list_head    cdap_reqs; +        pthread_mutex_t     cdap_reqs_lock; + +        struct addr_auth *  addr_auth;  } rib;  int ribmgr_ro_created(const char * name, @@ -229,6 +244,81 @@ static int ro_msg_create(struct rnode * node,          return 0;  } +static int ribmgr_ro_delete(const char * name) +{ +        char * str; +        char * str1; +        char * saveptr; +        char * token; +        struct rnode * node; +        struct rnode * prev; +        bool sibling = false; + +        str = strdup(name); +        if (str == NULL) +                return -1; + +        node = rib.root; +        prev = NULL; + +        for (str1 = str; ; str1 = NULL) { +                token = strtok_r(str1, PATH_DELIMITER, &saveptr); +                if (token == NULL) +                        break; + +                prev = node; +                node = node->child; +                sibling = false; + +                while (node != NULL) { +                        if (strcmp(node->name, token) == 0) { +                                break; +                        } else { +                                prev = node; +                                node = node->sibling; +                                sibling = true; +                        } +                } + +                if (node == NULL) { +                        free(str); +                        return -1; +                } +        } + +        if (node == rib.root) { +                LOG_ERR("Won't remove root."); +                free(str); +                return -1; +        } + +        free(node->name); +        free(node->full_name); +        if (node->data != NULL) +                free(node->data); + +        if (sibling) +                prev->sibling = node->sibling; +        else +                prev->child = node->sibling; + +        free(node); +        free(str); + +        LOG_DBG("Deleted RO with name %s.", name); + +        return 0; +} + +static void ro_delete_timer(void * o) +{ +        char * name = (char *) o; + +        if (ribmgr_ro_delete(name)) { +                LOG_ERR("Failed to delete %s.", name); +        } +} +  static struct rnode * ribmgr_ro_create(const char *      name,                                         struct ro_props * props,                                         uint8_t *         data, @@ -243,6 +333,7 @@ static struct rnode * ribmgr_ro_create(const char *      name,          struct rnode * new;          struct rnode * prev;          bool sibling; +        int timeout;          str = strdup(name);          if (str == NULL) @@ -318,71 +409,20 @@ static struct rnode * ribmgr_ro_create(const char *      name,          new->child = NULL;          new->sibling = NULL; -        return new; -} - -static int ribmgr_ro_delete(const char * name) -{ -        char * str; -        char * str1; -        char * saveptr; -        char * token; -        struct rnode * node; -        struct rnode * prev; -        bool sibling = false; - -        str = strdup(name); -        if (str == NULL) -                return -1; - -        node = rib.root; -        prev = NULL; - -        for (str1 = str; ; str1 = NULL) { -                token = strtok_r(str1, PATH_DELIMITER, &saveptr); -                if (token == NULL) -                        break; - -                prev = node; -                node = node->child; -                sibling = false; - -                while (node != NULL) { -                        if (strcmp(node->name, token) == 0) { -                                break; -                        } else { -                                prev = node; -                                node = node->sibling; -                                sibling = true; -                        } -                } +        LOG_DBG("Created RO with name %s.", name); -                if (node == NULL) { -                        free(str); -                        return -1; +        if (!(props->expiry.tv_sec == 0 && +              props->expiry.tv_nsec == 0)) { +                timeout = props->expiry.tv_sec * 1000 + +                        props->expiry.tv_nsec * MILLION; +                if (timerwheel_add(rib.wheel, ro_delete_timer, +                                   new->full_name, strlen(new->full_name), +                                   timeout)) { +                        LOG_ERR("Failed to add deletion timer of RO.");                  }          } -        if (node == rib.root) { -                LOG_ERR("Won't remove root."); -                free(str); -                return -1; -        } - -        free(node->name); -        free(node->full_name); -        if (node->data != NULL) -                free(node->data); - -        if (sibling) -                prev->sibling = node->sibling; -        else -                prev->child = node->sibling; - -        free(node); -        free(str); - -        return 0; +        return new;  }  static struct rnode * ribmgr_ro_write(const char * name, @@ -399,7 +439,8 @@ static struct rnode * ribmgr_ro_write(const char * name,          node->data = data;          node->len = len; -        node->seqno++; + +        LOG_DBG("Updated RO with name %s.", name);          return node;  } @@ -494,6 +535,7 @@ int ribmgr_init()          INIT_LIST_HEAD(&rib.flows);          INIT_LIST_HEAD(&rib.cdap_reqs);          INIT_LIST_HEAD(&rib.subs); +        INIT_LIST_HEAD(&rib.ro_ids);          rib.root = malloc(sizeof(*(rib.root)));          if (rib.root == NULL) @@ -533,6 +575,16 @@ int ribmgr_init()                  return -1;          } +        if (pthread_mutex_init(&rib.ro_ids_lock, NULL)) { +                LOG_ERR("Failed to initialize mutex."); +                pthread_rwlock_destroy(&rib.flows_lock); +                pthread_mutex_destroy(&rib.cdap_reqs_lock); +                pthread_mutex_destroy(&rib.ro_lock); +                pthread_mutex_destroy(&rib.subs_lock); +                free(rib.root); +                return -1; +        } +          rib.sids = bmp_create(SUBS_SIZE, 0);          if (rib.sids == NULL) {                  LOG_ERR("Failed to create bitmap."); @@ -540,6 +592,20 @@ int ribmgr_init()                  pthread_mutex_destroy(&rib.cdap_reqs_lock);                  pthread_mutex_destroy(&rib.ro_lock);                  pthread_mutex_destroy(&rib.subs_lock); +                pthread_mutex_destroy(&rib.ro_ids_lock); +                free(rib.root); +                return -1; +        } + +        rib.wheel = timerwheel_create(WHEEL_RESOLUTION, WHEEL_DELAY); +        if (rib.wheel == NULL) { +                LOG_ERR("Failed to create timerwheel."); +                bmp_destroy(rib.sids); +                pthread_rwlock_destroy(&rib.flows_lock); +                pthread_mutex_destroy(&rib.cdap_reqs_lock); +                pthread_mutex_destroy(&rib.ro_lock); +                pthread_mutex_destroy(&rib.subs_lock); +                pthread_mutex_destroy(&rib.ro_ids_lock);                  free(rib.root);                  return -1;          } @@ -547,11 +613,13 @@ int ribmgr_init()          rib.ribmgr_sid = ro_subscribe(RIBMGR_PREFIX, &ribmgr_sub_ops);          if (rib.ribmgr_sid < 0) {                  LOG_ERR("Failed to subscribe."); +                timerwheel_destroy(rib.wheel);                  bmp_destroy(rib.sids);                  pthread_rwlock_destroy(&rib.flows_lock);                  pthread_mutex_destroy(&rib.cdap_reqs_lock);                  pthread_mutex_destroy(&rib.ro_lock);                  pthread_mutex_destroy(&rib.subs_lock); +                pthread_mutex_destroy(&rib.ro_ids_lock);                  free(rib.root);                  return -1;          } @@ -608,11 +676,13 @@ int ribmgr_fini()          pthread_mutex_unlock(&rib.ro_lock);          bmp_destroy(rib.sids); +        timerwheel_destroy(rib.wheel);          pthread_mutex_destroy(&rib.subs_lock);          pthread_mutex_destroy(&rib.cdap_reqs_lock);          pthread_mutex_destroy(&rib.ro_lock);          pthread_rwlock_destroy(&rib.flows_lock); +        pthread_mutex_destroy(&rib.ro_ids_lock);          return 0;  } @@ -810,6 +880,7 @@ static int ribmgr_cdap_write(struct cdap * instance,                  cdap_send_reply(instance, invoke_id, -1, NULL, 0);                  return -1;          } +        node->seqno = msg->seqno;          pthread_mutex_lock(&rib.subs_lock); @@ -859,6 +930,8 @@ static int ribmgr_enrol_sync(struct cdap * instance,                                  return -1;                          } +                        LOG_DBG("Syncing RO with name %s.", node->full_name); +                          if (write_ro_msg(instance, &msg,                                           node->full_name, CDAP_CREATE)) {                                  LOG_ERR("Failed to send RO msg."); @@ -961,6 +1034,51 @@ static int ribmgr_cdap_stop(struct cdap * instance,          return 0;  } +static void ro_id_delete(void * o) +{ +        struct ro_id * ro_id = *((struct ro_id **) o); + +        pthread_mutex_lock(&rib.ro_ids_lock); +        list_del(&ro_id->next); +        free(ro_id->full_name); +        free(ro_id); +        pthread_mutex_unlock(&rib.ro_ids_lock); +} + +static int ro_id_create(char *     name, +                        ro_msg_t * msg) +{ +        struct ro_id * tmp; + +        tmp = malloc(sizeof(*tmp)); +        if (tmp == NULL) +                return -ENOMEM; + +        tmp->seqno = msg->seqno; +        tmp->full_name = strdup(name); +        INIT_LIST_HEAD(&tmp->next); + +        if (tmp->full_name == NULL) { +                free(tmp); +                return -ENOMEM; +        } + +        pthread_mutex_lock(&rib.ro_ids_lock); +        list_add(&tmp->next, &rib.ro_ids); + +        if (timerwheel_add(rib.wheel, ro_id_delete, +                           &tmp, sizeof(tmp), RO_ID_TIMEOUT)) { +                LOG_ERR("Failed to add item to timerwheel."); +                pthread_mutex_unlock(&rib.ro_ids_lock); +                free(tmp->full_name); +                free(tmp); +                return -1; +        } +        pthread_mutex_unlock(&rib.ro_ids_lock); + +        return 0; +} +  static int ribmgr_cdap_request(struct cdap *    instance,                                 int              invoke_id,                                 enum cdap_opcode opcode, @@ -989,7 +1107,20 @@ static int ribmgr_cdap_request(struct cdap *    instance,                  return -1;          } -        /* FIXME: Check if we already received this */ +        pthread_mutex_lock(&rib.ro_ids_lock); +        list_for_each(p, &rib.ro_ids) { +                struct ro_id * e = list_entry(p, struct ro_id, next); + +                if (strcmp(e->full_name, name) == 0 && +                    e->seqno == msg->seqno) { +                        pthread_mutex_unlock(&rib.ro_ids_lock); +                        ro_msg__free_unpacked(msg, NULL); +                        cdap_send_reply(instance, invoke_id, 0, NULL, 0); +                        LOG_DBG("Already received this RO."); +                        return 0; +                } +        } +        pthread_mutex_unlock(&rib.ro_ids_lock);          if (opcode == CDAP_CREATE) {                  ret = ribmgr_cdap_create(instance, @@ -1008,10 +1139,16 @@ static int ribmgr_cdap_request(struct cdap *    instance,                                           name);          } else {                  LOG_INFO("Unsupported opcode received."); +                ro_msg__free_unpacked(msg, NULL);                  cdap_send_reply(instance, invoke_id, -1, NULL, 0);                  return -1;          } +        if (ro_id_create(name, msg)) { +                LOG_ERR("Failed to create RO id."); +                return -1; +        } +          if (msg->recv_set == ALL_MEMBERS) {                  pthread_rwlock_rdlock(&rib.flows_lock);                  list_for_each(p, &rib.flows) { @@ -1364,6 +1501,7 @@ int ro_write(const char * name,                  LOG_ERR("Failed to create RO.");                  return -1;          } +        node->seqno++;          if (node->props->recv_set == NO_SYNC) {                  pthread_mutex_unlock(&rib.ro_lock); | 
