diff options
Diffstat (limited to 'src/ipcpd/normal')
| -rw-r--r-- | src/ipcpd/normal/dht.c | 126 | ||||
| -rw-r--r-- | src/ipcpd/normal/dht.h | 3 | ||||
| -rw-r--r-- | src/ipcpd/normal/dir.c | 18 | ||||
| -rw-r--r-- | src/ipcpd/normal/dir.h | 2 | ||||
| -rw-r--r-- | src/ipcpd/normal/dt.c | 7 | ||||
| -rw-r--r-- | src/ipcpd/normal/main.c | 41 | ||||
| -rw-r--r-- | src/ipcpd/normal/pol/link_state.c | 32 | 
7 files changed, 139 insertions, 90 deletions
| diff --git a/src/ipcpd/normal/dht.c b/src/ipcpd/normal/dht.c index b1ba44a8..e4c37884 100644 --- a/src/ipcpd/normal/dht.c +++ b/src/ipcpd/normal/dht.c @@ -31,10 +31,12 @@  #include <ouroboros/errno.h>  #include <ouroboros/logs.h>  #include <ouroboros/list.h> +#include <ouroboros/notifier.h>  #include <ouroboros/random.h>  #include <ouroboros/time_utils.h>  #include <ouroboros/utils.h> +#include "connmgr.h"  #include "dht.h"  #include "dt.h" @@ -63,9 +65,12 @@ typedef KadContactMsg kad_contact_msg_t;  #define KAD_QUEER     15   /* Time to declare peer questionable.         */  #define KAD_BETA      8    /* Bucket split factor, must be 1, 2, 4 or 8. */  #define KAD_RESP_RETR 6    /* Number of retries on sending a response.   */ +#define KAD_JOIN_RETR 5    /* Number of retries sending a join.          */ +#define KAD_JOIN_INTV 1    /* Time (seconds) between join retries.       */  enum dht_state {          DHT_INIT = 0, +        DHT_JOINING,          DHT_RUNNING,          DHT_SHUTDOWN,  }; @@ -207,6 +212,7 @@ struct dht {          struct bmp *     cookies;          enum dht_state   state; +        pthread_cond_t   cond;          pthread_mutex_t  mtx;          pthread_rwlock_t lock; @@ -216,6 +222,11 @@ struct dht {          pthread_t        worker;  }; +struct join_info { +        struct dht * dht; +        uint64_t     addr; +}; +  static uint8_t * dht_dup_key(const uint8_t * key,                               size_t          len)  { @@ -250,9 +261,28 @@ static void dht_set_state(struct dht *   dht,          dht->state = state; +        pthread_cond_signal(&dht->cond); +          pthread_mutex_unlock(&dht->mtx);  } +static int dht_wait_running(struct dht * dht) +{ +        int ret = 0; + +        pthread_mutex_lock(&dht->mtx); + +        while (dht->state == DHT_JOINING) +                pthread_cond_wait(&dht->cond, &dht->mtx); + +        if (dht->state != DHT_RUNNING) +                ret = -1; + +        pthread_mutex_unlock(&dht->mtx); + +        return ret; +} +  static uint8_t * create_id(size_t len)  {          uint8_t * id; @@ -1626,7 +1656,6 @@ static int kad_join(struct dht * dht,                      uint64_t     addr)  {          kad_msg_t       msg = KAD_MSG__INIT; -        struct lookup * lu;          msg.code = KAD_JOIN; @@ -1657,10 +1686,6 @@ static int kad_join(struct dht * dht,          pthread_rwlock_unlock(&dht->lock); -        lu = kad_lookup(dht, dht->id, KAD_FIND_NODE); -        if (lu != NULL) -                lookup_destroy(lu); -          return 0;  } @@ -2074,14 +2099,6 @@ int dht_bootstrap(struct dht * dht,          return -1;  } -int dht_enroll(struct dht * dht, -               uint64_t     addr) -{ -        assert(dht); - -        return kad_join(dht, addr); -} -  int dht_reg(struct dht *    dht,              const uint8_t * key)  { @@ -2222,8 +2239,9 @@ void dht_post_sdu(void *               ae,                  return;          } -        if (msg->code != KAD_RESPONSE && dht_get_state(dht) != DHT_RUNNING) { +        if (msg->code != KAD_RESPONSE && dht_wait_running(dht)) {                  kad_msg__free_unpacked(msg, NULL); +                log_dbg("Got a request message when not running.");                  return;          } @@ -2383,6 +2401,75 @@ void dht_destroy(struct dht * dht)          free(dht);  } +static void * join_thr(void * o) +{ +        struct join_info * info = (struct join_info *) o; +        struct lookup *    lu; +        size_t             retr = 0; + +        assert(info); + +        while (kad_join(info->dht, info->addr)) { +                if (retr++ == KAD_JOIN_RETR) { +                        dht_set_state(info->dht, DHT_INIT); +                        goto finish; +                } + +                sleep(KAD_JOIN_INTV); +        } + +        lu = kad_lookup(info->dht, info->dht->id, KAD_FIND_NODE); +        if (lu != NULL) +                lookup_destroy(lu); + + finish: +        free(info); + +        return (void *) 0; +} + +static void handle_event(void *       self, +                         int          event, +                         const void * o) +{ +        struct dht * dht = (struct dht *) self; + +        if (event == NOTIFY_DT_CONN_ADD) { +                struct lookup *    lu; +                pthread_t          thr; +                struct join_info * info; +                struct conn *      c     = (struct conn *) o; +                enum dht_state     state = dht_get_state(dht); + +                switch(state) { +                case DHT_INIT: +                        info = malloc(sizeof(*info)); +                        if (info == NULL) +                                break; + +                        info->dht  = dht; +                        info->addr = c->conn_info.addr; + +                        dht_set_state(dht, DHT_JOINING); + +                        if (pthread_create(&thr, NULL, join_thr, info)) { +                                dht_set_state(dht, DHT_INIT); +                                free(info); +                                return; +                        } +                        pthread_detach(thr); +                        break; +                case DHT_RUNNING: +                        lu = kad_lookup(dht, dht->id, KAD_FIND_NODE); +                        if (lu != NULL) +                                lookup_destroy(lu); +                        break; +                default: +                        break; +                } +        } +} +  struct dht * dht_create(uint64_t addr)  {          struct dht * dht; @@ -2404,6 +2491,9 @@ struct dht * dht_create(uint64_t addr)          if (pthread_mutex_init(&dht->mtx, NULL))                  goto fail_mutex; +        if (pthread_cond_init(&dht->cond, NULL)) +                goto fail_cond; +          dht->cookies = bmp_create(DHT_MAX_REQS, 1);          if (dht->cookies == NULL)                  goto fail_bmp; @@ -2413,13 +2503,17 @@ struct dht * dht_create(uint64_t addr)          dht->id   = NULL;  #ifndef __DHT_TEST__          dht->fd   = dt_reg_ae(dht, &dht_post_sdu); -#endif /* __DHT_TEST__ */ - +        notifier_reg(handle_event, dht); +#else +        (void) handle_event; +#endif          dht->state = DHT_INIT;          return dht;   fail_bmp: +        pthread_cond_destroy(&dht->cond); + fail_cond:          pthread_mutex_destroy(&dht->mtx);   fail_mutex:          pthread_rwlock_destroy(&dht->lock); diff --git a/src/ipcpd/normal/dht.h b/src/ipcpd/normal/dht.h index d893f913..e4647a08 100644 --- a/src/ipcpd/normal/dht.h +++ b/src/ipcpd/normal/dht.h @@ -36,9 +36,6 @@ int          dht_bootstrap(struct dht * dht,                             size_t       b,                             time_t       t_expire); -int          dht_enroll(struct dht * dht, -                        uint64_t     addr); -  void         dht_destroy(struct dht * dht);  int          dht_reg(struct dht *    dht, diff --git a/src/ipcpd/normal/dir.c b/src/ipcpd/normal/dir.c index 6d04c66a..0d046cd6 100644 --- a/src/ipcpd/normal/dir.c +++ b/src/ipcpd/normal/dir.c @@ -39,8 +39,6 @@  #include <inttypes.h>  #define KAD_B (hash_len(ipcpi.dir_hash_algo) * CHAR_BIT) -#define ENROL_RETR 6 -#define ENROL_INTV 1  struct dht * dht; @@ -72,22 +70,6 @@ int dir_bootstrap(void) {          return 0;  } -int dir_enroll(uint64_t addr) { -        size_t retr = 0; -        log_dbg("Enrolling directory with peer %" PRIu64 ".", addr); -        while (dht_enroll(dht, addr)) { -                if (retr++ == ENROL_RETR) -                        return -EPERM; - -                log_dbg("Directory enrollment failed, retrying..."); -                sleep(ENROL_INTV); -        } - -        log_info("Directory enrolled."); - -        return 0; -} -  int dir_reg(const uint8_t * hash)  {          return dht_reg(dht, hash); diff --git a/src/ipcpd/normal/dir.h b/src/ipcpd/normal/dir.h index 13571b68..68959836 100644 --- a/src/ipcpd/normal/dir.h +++ b/src/ipcpd/normal/dir.h @@ -29,8 +29,6 @@ void     dir_fini(void);  int      dir_bootstrap(void); -int      dir_enroll(uint64_t peer); -  int      dir_reg(const uint8_t * hash);  int      dir_unreg(const uint8_t * hash); diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index 56cb5a61..2db0e7e2 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -67,11 +67,14 @@ struct {          pthread_t          listener;  } dt; -static void handle_event(int          event, +static void handle_event(void *       self, +                         int          event,                           const void * o)  {          struct conn * c; +        (void) self; +          c = (struct conn *) o;          switch (event) { @@ -182,7 +185,7 @@ int dt_init(enum pol_routing pr,                  goto fail_pci_init;          } -        if (notifier_reg(handle_event)) { +        if (notifier_reg(handle_event, NULL)) {                  log_err("Failed to register with notifier.");                  goto fail_notifier_reg;          } diff --git a/src/ipcpd/normal/main.c b/src/ipcpd/normal/main.c index e6dd6717..6cfea4bc 100644 --- a/src/ipcpd/normal/main.c +++ b/src/ipcpd/normal/main.c @@ -182,29 +182,18 @@ static int bootstrap_components(void)          return 0;  } -static int enroll_components(uint64_t peer) -{ -        if (dir_enroll(peer)) { -                log_err("Failed to enroll directory."); -                return -1; -        } - -        return 0; -} -  static int normal_ipcp_enroll(const char *      dst,                                struct dif_info * info)  { -        struct conn er_conn; -        struct conn dt_conn; +        struct conn conn; -        if (connmgr_alloc(AEID_ENROLL, dst, NULL, &er_conn)) { +        if (connmgr_alloc(AEID_ENROLL, dst, NULL, &conn)) {                  log_err("Failed to get connection.");                  goto fail_er_flow;          }          /* Get boot state from peer. */ -        if (enroll_boot(&er_conn)) { +        if (enroll_boot(&conn)) {                  log_err("Failed to get boot information.");                  goto fail_enroll_boot;          } @@ -219,29 +208,15 @@ static int normal_ipcp_enroll(const char *      dst,                  goto fail_dt_start;          } -        if (connmgr_alloc(AEID_DT, dst, NULL, &dt_conn)) { -                log_err("Failed to create a data transfer flow."); -                goto fail_dt_flow; -        } -          if (start_components()) {                  log_err("Failed to start components.");                  goto fail_start_comp;          } -        if (enroll_components(dt_conn.conn_info.addr)) { -                enroll_done(&er_conn, -1); -                log_err("Failed to enroll components."); -                goto fail_enroll_comp; -        } - -        if (enroll_done(&er_conn, 0)) +        if (enroll_done(&conn, 0))                  log_warn("Failed to confirm enrollment with peer."); -        if (connmgr_dealloc(AEID_DT, &dt_conn)) -                log_warn("Failed to deallocate data transfer flow."); - -        if (connmgr_dealloc(AEID_ENROLL, &er_conn)) +        if (connmgr_dealloc(AEID_ENROLL, &conn))                  log_warn("Failed to deallocate enrollment flow.");          log_info("Enrolled with %s.", dst); @@ -251,16 +226,12 @@ static int normal_ipcp_enroll(const char *      dst,          return 0; - fail_enroll_comp: -        stop_components();   fail_start_comp: -        connmgr_dealloc(AEID_DT, &dt_conn); - fail_dt_flow:          dt_stop();   fail_dt_start:          finalize_components();   fail_enroll_boot: -        connmgr_dealloc(AEID_ENROLL, &er_conn); +        connmgr_dealloc(AEID_ENROLL, &conn);   fail_er_flow:          return -1;  } diff --git a/src/ipcpd/normal/pol/link_state.c b/src/ipcpd/normal/pol/link_state.c index df0933c6..2823f28e 100644 --- a/src/ipcpd/normal/pol/link_state.c +++ b/src/ipcpd/normal/pol/link_state.c @@ -54,7 +54,7 @@ typedef LinkStateMsg link_state_msg_t;  #define RECALC_TIME    4  #define LS_UPDATE_TIME 15  #define LS_TIMEO       60 -#define LSA_MAX_LEN    128 +#define LSM_MAX_LEN    128  #define LSDB           "lsdb"  #ifndef CLOCK_REALTIME_COARSE @@ -416,22 +416,22 @@ static void * calculate_pff(void * o)          return (void *) 0;  } -static void send_lsa(uint64_t src, +static void send_lsm(uint64_t src,                       uint64_t dst)  { -        uint8_t            buf[LSA_MAX_LEN]; -        link_state_msg_t   lsa = LINK_STATE_MSG__INIT; +        uint8_t            buf[LSM_MAX_LEN]; +        link_state_msg_t   lsm = LINK_STATE_MSG__INIT;          size_t             len;          struct list_head * p; -        lsa.d_addr = dst; -        lsa.s_addr = src; +        lsm.d_addr = dst; +        lsm.s_addr = src; -        len = link_state_msg__get_packed_size(&lsa); +        len = link_state_msg__get_packed_size(&lsm); -        assert(len <= LSA_MAX_LEN); +        assert(len <= LSM_MAX_LEN); -        link_state_msg__pack(&lsa, buf); +        link_state_msg__pack(&lsm, buf);          list_for_each(p, &ls.nbs) {                  struct nb * nb = list_entry(p, struct nb, next); @@ -471,7 +471,7 @@ static void * lsupdate(void * o)                          }                          if (adj->src == ipcpi.dt_addr) { -                                send_lsa(adj->src, adj->dst); +                                send_lsm(adj->src, adj->dst);                                  adj->stamp = now.tv_sec;                          }                  } @@ -526,7 +526,7 @@ static void * lsreader(void * o)  {          fqueue_t * fq;          int        ret; -        uint8_t    buf[LSA_MAX_LEN]; +        uint8_t    buf[LSM_MAX_LEN];          size_t     len;          int        fd;          qosspec_t  qs; @@ -551,7 +551,7 @@ static void * lsreader(void * o)                  while ((fd = fqueue_next(fq)) >= 0) {                          link_state_msg_t * msg; -                        len = flow_read(fd, buf, LSA_MAX_LEN); +                        len = flow_read(fd, buf, LSM_MAX_LEN);                          if (len <= 0)                                  continue; @@ -574,13 +574,16 @@ static void * lsreader(void * o)          return (void *) 0;  } -static void handle_event(int          event, +static void handle_event(void *       self, +                         int          event,                           const void * o)  {          /* FIXME: Apply correct QoS on graph */          struct conn * c;          qosspec_t     qs; +        (void) self; +          c = (struct conn *) o;          memset(&qs, 0, sizeof(qs)); @@ -592,6 +595,7 @@ static void handle_event(int          event,                  if (lsdb_add_link(ipcpi.dt_addr, c->conn_info.addr, &qs))                          log_dbg("Failed to add adjacency to LSDB."); +                send_lsm(ipcpi.dt_addr, c->conn_info.addr);                  break;          case NOTIFY_DT_CONN_DEL:                  if (lsdb_del_nb(c->conn_info.addr, c->flow_info.fd)) @@ -666,7 +670,7 @@ int link_state_init(void)          if (ls.graph == NULL)                  goto fail_graph; -        if (notifier_reg(handle_event)) +        if (notifier_reg(handle_event, NULL))                  goto fail_notifier_reg;          if (pthread_rwlock_init(&ls.db_lock, NULL)) | 
