summaryrefslogtreecommitdiff
path: root/src/ipcpd
diff options
context:
space:
mode:
Diffstat (limited to 'src/ipcpd')
-rw-r--r--src/ipcpd/normal/ribmgr.c300
1 files changed, 219 insertions, 81 deletions
diff --git a/src/ipcpd/normal/ribmgr.c b/src/ipcpd/normal/ribmgr.c
index 49971eda..524c5a39 100644
--- a/src/ipcpd/normal/ribmgr.c
+++ b/src/ipcpd/normal/ribmgr.c
@@ -36,6 +36,7 @@
#include <string.h>
#include <errno.h>
+#include "timerwheel.h"
#include "addr_auth.h"
#include "ribmgr.h"
#include "dt_const.h"
@@ -50,13 +51,16 @@ typedef StaticInfoMsg static_info_msg_t;
#include "ro.pb-c.h"
typedef RoMsg ro_msg_t;
-#define SUBS_SIZE 25
+#define SUBS_SIZE 25
+#define WHEEL_RESOLUTION 1000 /* ms */
+#define WHEEL_DELAY 3600000 /* ms */
+#define RO_ID_TIMEOUT 1000 /* ms */
-#define ENROLLMENT "enrollment"
+#define ENROLLMENT "enrollment"
-#define RIBMGR_PREFIX "/ribmgr"
-#define STAT_INFO "/statinfo"
-#define PATH_DELIMITER "/"
+#define RIBMGR_PREFIX "/ribmgr"
+#define STAT_INFO "/statinfo"
+#define PATH_DELIMITER "/"
/* RIB objects */
struct rnode {
@@ -92,26 +96,37 @@ struct ro_sub {
struct list_head next;
};
+struct ro_id {
+ uint64_t seqno;
+ char * full_name;
+ struct list_head next;
+};
+
struct {
- struct rnode * root;
- pthread_mutex_t ro_lock;
+ struct rnode * root;
+ pthread_mutex_t ro_lock;
- struct list_head subs;
- struct bmp * sids;
- pthread_mutex_t subs_lock;
- int ribmgr_sid;
+ struct list_head subs;
+ struct bmp * sids;
+ pthread_mutex_t subs_lock;
+ int ribmgr_sid;
- struct dt_const dtc;
+ struct dt_const dtc;
- uint64_t address;
+ uint64_t address;
- struct list_head flows;
- pthread_rwlock_t flows_lock;
+ struct timerwheel * wheel;
- struct list_head cdap_reqs;
- pthread_mutex_t cdap_reqs_lock;
+ struct list_head ro_ids;
+ pthread_mutex_t ro_ids_lock;
- struct addr_auth * addr_auth;
+ struct list_head flows;
+ pthread_rwlock_t flows_lock;
+
+ struct list_head cdap_reqs;
+ pthread_mutex_t cdap_reqs_lock;
+
+ struct addr_auth * addr_auth;
} rib;
int ribmgr_ro_created(const char * name,
@@ -229,6 +244,81 @@ static int ro_msg_create(struct rnode * node,
return 0;
}
+static int ribmgr_ro_delete(const char * name)
+{
+ char * str;
+ char * str1;
+ char * saveptr;
+ char * token;
+ struct rnode * node;
+ struct rnode * prev;
+ bool sibling = false;
+
+ str = strdup(name);
+ if (str == NULL)
+ return -1;
+
+ node = rib.root;
+ prev = NULL;
+
+ for (str1 = str; ; str1 = NULL) {
+ token = strtok_r(str1, PATH_DELIMITER, &saveptr);
+ if (token == NULL)
+ break;
+
+ prev = node;
+ node = node->child;
+ sibling = false;
+
+ while (node != NULL) {
+ if (strcmp(node->name, token) == 0) {
+ break;
+ } else {
+ prev = node;
+ node = node->sibling;
+ sibling = true;
+ }
+ }
+
+ if (node == NULL) {
+ free(str);
+ return -1;
+ }
+ }
+
+ if (node == rib.root) {
+ LOG_ERR("Won't remove root.");
+ free(str);
+ return -1;
+ }
+
+ free(node->name);
+ free(node->full_name);
+ if (node->data != NULL)
+ free(node->data);
+
+ if (sibling)
+ prev->sibling = node->sibling;
+ else
+ prev->child = node->sibling;
+
+ free(node);
+ free(str);
+
+ LOG_DBG("Deleted RO with name %s.", name);
+
+ return 0;
+}
+
+static void ro_delete_timer(void * o)
+{
+ char * name = (char *) o;
+
+ if (ribmgr_ro_delete(name)) {
+ LOG_ERR("Failed to delete %s.", name);
+ }
+}
+
static struct rnode * ribmgr_ro_create(const char * name,
struct ro_props * props,
uint8_t * data,
@@ -243,6 +333,7 @@ static struct rnode * ribmgr_ro_create(const char * name,
struct rnode * new;
struct rnode * prev;
bool sibling;
+ int timeout;
str = strdup(name);
if (str == NULL)
@@ -318,71 +409,20 @@ static struct rnode * ribmgr_ro_create(const char * name,
new->child = NULL;
new->sibling = NULL;
- return new;
-}
-
-static int ribmgr_ro_delete(const char * name)
-{
- char * str;
- char * str1;
- char * saveptr;
- char * token;
- struct rnode * node;
- struct rnode * prev;
- bool sibling = false;
-
- str = strdup(name);
- if (str == NULL)
- return -1;
-
- node = rib.root;
- prev = NULL;
-
- for (str1 = str; ; str1 = NULL) {
- token = strtok_r(str1, PATH_DELIMITER, &saveptr);
- if (token == NULL)
- break;
-
- prev = node;
- node = node->child;
- sibling = false;
-
- while (node != NULL) {
- if (strcmp(node->name, token) == 0) {
- break;
- } else {
- prev = node;
- node = node->sibling;
- sibling = true;
- }
- }
+ LOG_DBG("Created RO with name %s.", name);
- if (node == NULL) {
- free(str);
- return -1;
+ if (!(props->expiry.tv_sec == 0 &&
+ props->expiry.tv_nsec == 0)) {
+ timeout = props->expiry.tv_sec * 1000 +
+ props->expiry.tv_nsec * MILLION;
+ if (timerwheel_add(rib.wheel, ro_delete_timer,
+ new->full_name, strlen(new->full_name),
+ timeout)) {
+ LOG_ERR("Failed to add deletion timer of RO.");
}
}
- if (node == rib.root) {
- LOG_ERR("Won't remove root.");
- free(str);
- return -1;
- }
-
- free(node->name);
- free(node->full_name);
- if (node->data != NULL)
- free(node->data);
-
- if (sibling)
- prev->sibling = node->sibling;
- else
- prev->child = node->sibling;
-
- free(node);
- free(str);
-
- return 0;
+ return new;
}
static struct rnode * ribmgr_ro_write(const char * name,
@@ -399,7 +439,8 @@ static struct rnode * ribmgr_ro_write(const char * name,
node->data = data;
node->len = len;
- node->seqno++;
+
+ LOG_DBG("Updated RO with name %s.", name);
return node;
}
@@ -494,6 +535,7 @@ int ribmgr_init()
INIT_LIST_HEAD(&rib.flows);
INIT_LIST_HEAD(&rib.cdap_reqs);
INIT_LIST_HEAD(&rib.subs);
+ INIT_LIST_HEAD(&rib.ro_ids);
rib.root = malloc(sizeof(*(rib.root)));
if (rib.root == NULL)
@@ -533,6 +575,16 @@ int ribmgr_init()
return -1;
}
+ if (pthread_mutex_init(&rib.ro_ids_lock, NULL)) {
+ LOG_ERR("Failed to initialize mutex.");
+ pthread_rwlock_destroy(&rib.flows_lock);
+ pthread_mutex_destroy(&rib.cdap_reqs_lock);
+ pthread_mutex_destroy(&rib.ro_lock);
+ pthread_mutex_destroy(&rib.subs_lock);
+ free(rib.root);
+ return -1;
+ }
+
rib.sids = bmp_create(SUBS_SIZE, 0);
if (rib.sids == NULL) {
LOG_ERR("Failed to create bitmap.");
@@ -540,6 +592,20 @@ int ribmgr_init()
pthread_mutex_destroy(&rib.cdap_reqs_lock);
pthread_mutex_destroy(&rib.ro_lock);
pthread_mutex_destroy(&rib.subs_lock);
+ pthread_mutex_destroy(&rib.ro_ids_lock);
+ free(rib.root);
+ return -1;
+ }
+
+ rib.wheel = timerwheel_create(WHEEL_RESOLUTION, WHEEL_DELAY);
+ if (rib.wheel == NULL) {
+ LOG_ERR("Failed to create timerwheel.");
+ bmp_destroy(rib.sids);
+ pthread_rwlock_destroy(&rib.flows_lock);
+ pthread_mutex_destroy(&rib.cdap_reqs_lock);
+ pthread_mutex_destroy(&rib.ro_lock);
+ pthread_mutex_destroy(&rib.subs_lock);
+ pthread_mutex_destroy(&rib.ro_ids_lock);
free(rib.root);
return -1;
}
@@ -547,11 +613,13 @@ int ribmgr_init()
rib.ribmgr_sid = ro_subscribe(RIBMGR_PREFIX, &ribmgr_sub_ops);
if (rib.ribmgr_sid < 0) {
LOG_ERR("Failed to subscribe.");
+ timerwheel_destroy(rib.wheel);
bmp_destroy(rib.sids);
pthread_rwlock_destroy(&rib.flows_lock);
pthread_mutex_destroy(&rib.cdap_reqs_lock);
pthread_mutex_destroy(&rib.ro_lock);
pthread_mutex_destroy(&rib.subs_lock);
+ pthread_mutex_destroy(&rib.ro_ids_lock);
free(rib.root);
return -1;
}
@@ -608,11 +676,13 @@ int ribmgr_fini()
pthread_mutex_unlock(&rib.ro_lock);
bmp_destroy(rib.sids);
+ timerwheel_destroy(rib.wheel);
pthread_mutex_destroy(&rib.subs_lock);
pthread_mutex_destroy(&rib.cdap_reqs_lock);
pthread_mutex_destroy(&rib.ro_lock);
pthread_rwlock_destroy(&rib.flows_lock);
+ pthread_mutex_destroy(&rib.ro_ids_lock);
return 0;
}
@@ -810,6 +880,7 @@ static int ribmgr_cdap_write(struct cdap * instance,
cdap_send_reply(instance, invoke_id, -1, NULL, 0);
return -1;
}
+ node->seqno = msg->seqno;
pthread_mutex_lock(&rib.subs_lock);
@@ -859,6 +930,8 @@ static int ribmgr_enrol_sync(struct cdap * instance,
return -1;
}
+ LOG_DBG("Syncing RO with name %s.", node->full_name);
+
if (write_ro_msg(instance, &msg,
node->full_name, CDAP_CREATE)) {
LOG_ERR("Failed to send RO msg.");
@@ -961,6 +1034,51 @@ static int ribmgr_cdap_stop(struct cdap * instance,
return 0;
}
+static void ro_id_delete(void * o)
+{
+ struct ro_id * ro_id = *((struct ro_id **) o);
+
+ pthread_mutex_lock(&rib.ro_ids_lock);
+ list_del(&ro_id->next);
+ free(ro_id->full_name);
+ free(ro_id);
+ pthread_mutex_unlock(&rib.ro_ids_lock);
+}
+
+static int ro_id_create(char * name,
+ ro_msg_t * msg)
+{
+ struct ro_id * tmp;
+
+ tmp = malloc(sizeof(*tmp));
+ if (tmp == NULL)
+ return -ENOMEM;
+
+ tmp->seqno = msg->seqno;
+ tmp->full_name = strdup(name);
+ INIT_LIST_HEAD(&tmp->next);
+
+ if (tmp->full_name == NULL) {
+ free(tmp);
+ return -ENOMEM;
+ }
+
+ pthread_mutex_lock(&rib.ro_ids_lock);
+ list_add(&tmp->next, &rib.ro_ids);
+
+ if (timerwheel_add(rib.wheel, ro_id_delete,
+ &tmp, sizeof(tmp), RO_ID_TIMEOUT)) {
+ LOG_ERR("Failed to add item to timerwheel.");
+ pthread_mutex_unlock(&rib.ro_ids_lock);
+ free(tmp->full_name);
+ free(tmp);
+ return -1;
+ }
+ pthread_mutex_unlock(&rib.ro_ids_lock);
+
+ return 0;
+}
+
static int ribmgr_cdap_request(struct cdap * instance,
int invoke_id,
enum cdap_opcode opcode,
@@ -989,7 +1107,20 @@ static int ribmgr_cdap_request(struct cdap * instance,
return -1;
}
- /* FIXME: Check if we already received this */
+ pthread_mutex_lock(&rib.ro_ids_lock);
+ list_for_each(p, &rib.ro_ids) {
+ struct ro_id * e = list_entry(p, struct ro_id, next);
+
+ if (strcmp(e->full_name, name) == 0 &&
+ e->seqno == msg->seqno) {
+ pthread_mutex_unlock(&rib.ro_ids_lock);
+ ro_msg__free_unpacked(msg, NULL);
+ cdap_send_reply(instance, invoke_id, 0, NULL, 0);
+ LOG_DBG("Already received this RO.");
+ return 0;
+ }
+ }
+ pthread_mutex_unlock(&rib.ro_ids_lock);
if (opcode == CDAP_CREATE) {
ret = ribmgr_cdap_create(instance,
@@ -1008,10 +1139,16 @@ static int ribmgr_cdap_request(struct cdap * instance,
name);
} else {
LOG_INFO("Unsupported opcode received.");
+ ro_msg__free_unpacked(msg, NULL);
cdap_send_reply(instance, invoke_id, -1, NULL, 0);
return -1;
}
+ if (ro_id_create(name, msg)) {
+ LOG_ERR("Failed to create RO id.");
+ return -1;
+ }
+
if (msg->recv_set == ALL_MEMBERS) {
pthread_rwlock_rdlock(&rib.flows_lock);
list_for_each(p, &rib.flows) {
@@ -1364,6 +1501,7 @@ int ro_write(const char * name,
LOG_ERR("Failed to create RO.");
return -1;
}
+ node->seqno++;
if (node->props->recv_set == NO_SYNC) {
pthread_mutex_unlock(&rib.ro_lock);