diff options
| author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-05-23 13:38:03 +0200 | 
|---|---|---|
| committer | dimitri staessens <dimitri.staessens@ugent.be> | 2017-05-24 11:23:07 +0200 | 
| commit | 3d887b172c37c4418c6173048e6a317eb0c36e57 (patch) | |
| tree | e251a93bb3f573fc5010b3be418fbfaa876c9c7c /src/ipcpd | |
| parent | 78c9a10950ee93f90d83ab727b1d1d3430e2effa (diff) | |
| download | ouroboros-3d887b172c37c4418c6173048e6a317eb0c36e57.tar.gz ouroboros-3d887b172c37c4418c6173048e6a317eb0c36e57.zip | |
ipcpd: Allow registering protocol machines with DT
Other protocol machines now have to register on top of the DT AE. This
allows multiple instances of the same protocol machine and avoids
preallocating fds for each protocol machine instance.
Diffstat (limited to 'src/ipcpd')
| -rw-r--r-- | src/ipcpd/normal/dt.c | 91 | ||||
| -rw-r--r-- | src/ipcpd/normal/dt.h | 5 | ||||
| -rw-r--r-- | src/ipcpd/normal/fa.c | 197 | ||||
| -rw-r--r-- | src/ipcpd/normal/fa.h | 2 | 
4 files changed, 176 insertions, 119 deletions
| diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index b22fb59c..1867c13b 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -23,13 +23,15 @@  #define OUROBOROS_PREFIX "dt-ae"  #include <ouroboros/config.h> +#include <ouroboros/bitmap.h> +#include <ouroboros/errno.h>  #include <ouroboros/logs.h>  #include <ouroboros/rib.h>  #include <ouroboros/dev.h> -#include "dt.h"  #include "connmgr.h"  #include "ipcp.h" +#include "dt.h"  #include "dt_pci.h"  #include "pff.h"  #include "neighbors.h" @@ -47,12 +49,21 @@  #include <inttypes.h>  #include <assert.h> +struct ae_info { +        int    (*post_sdu)(void * ae, struct shm_du_buff * sdb); +        void * ae; +}; +  struct {          struct sdu_sched * sdu_sched;          struct pff *       pff[QOS_CUBE_MAX];          struct routing_i * routing[QOS_CUBE_MAX]; +        struct bmp *       res_fds; +        struct ae_info     aes[AP_RES_FDS]; +        pthread_rwlock_t   lock; +          struct gam *       gam;          struct nbs *       nbs;          struct ae *        ae; @@ -120,18 +131,12 @@ static int sdu_handler(int                  fd,                          return 0;                  } -                switch (dt_pci.fd) { -                case FD_FA: -                        if (fa_post_sdu(sdb)) { -                                ipcp_sdb_release(sdb); -                                return -1; -                        } -                        return 0; -                default: -                        log_err("Unknown PDU type received."); +                if (dt.aes[dt_pci.fd].post_sdu(dt.aes[dt_pci.fd].ae, sdb)) {                          ipcp_sdb_release(sdb);                          return -1;                  } + +                return 0;          }          /* silence compiler */ @@ -167,24 +172,24 @@ int dt_init(void)          dt.ae = connmgr_ae_create(info);          if (dt.ae == NULL) {                  log_err("Failed to create AE struct."); -                return -1; +                goto fail_connmgr;          }          dt.nbs = nbs_create();          if (dt.nbs == NULL) {                  log_err("Failed to create neighbors struct."); -                goto fail_connmgr; +                goto fail_nbs;          }          dt.nb_notifier.notify_call = dt_neighbor_event;          if (nbs_reg_notifier(dt.nbs, &dt.nb_notifier)) {                  log_err("Failed to register notifier."); -                goto fail_nbs; +                goto fail_nbs_notifier;          }          if (routing_init(pr, dt.nbs)) {                  log_err("Failed to init routing."); -                goto fail_nbs_notifier; +                goto fail_routing;          }          for (i = 0; i < QOS_CUBE_MAX; ++i) { @@ -192,7 +197,7 @@ int dt_init(void)                  if (dt.pff[i] == NULL) {                          for (j = 0; j < i; ++j)                                  pff_destroy(dt.pff[j]); -                        goto fail_routing; +                        goto fail_pff;                  }          } @@ -201,22 +206,39 @@ int dt_init(void)                  if (dt.routing[i] == NULL) {                          for (j = 0; j < i; ++j)                                  routing_i_destroy(dt.routing[j]); -                        goto fail_pff; +                        goto fail_routing_i;                  }          } +        if (pthread_rwlock_init(&dt.lock, NULL)) { +                log_err("Failed to init rwlock."); +                goto fail_rwlock_init; +        } + +        dt.res_fds = bmp_create(AP_RES_FDS, 0); +        if (dt.res_fds == NULL) +                goto fail_res_fds; +          return 0; - fail_pff: + + fail_res_fds: +        pthread_rwlock_destroy(&dt.lock); + fail_rwlock_init: +        for (j = 0; j < QOS_CUBE_MAX; ++j) +                routing_i_destroy(dt.routing[j]); + fail_routing_i:          for (i = 0; i < QOS_CUBE_MAX; ++i)                  pff_destroy(dt.pff[i]); - fail_routing: + fail_pff:          routing_fini(); - fail_nbs_notifier: + fail_routing:          nbs_unreg_notifier(dt.nbs, &dt.nb_notifier); - fail_nbs: + fail_nbs_notifier:          nbs_destroy(dt.nbs); - fail_connmgr: + fail_nbs:          connmgr_ae_destroy(dt.ae); + fail_connmgr: +        dt_pci_fini();          return -1;  } @@ -272,6 +294,33 @@ void dt_stop(void)          sdu_sched_destroy(dt.sdu_sched);  } +int dt_reg_ae(void * ae, +              int (* func)(void * func, struct shm_du_buff *)) +{ +        int res_fd; + +        assert(func); + +        pthread_rwlock_wrlock(&dt.lock); + +        res_fd = bmp_allocate(dt.res_fds); +        if (!bmp_is_id_valid(dt.res_fds, res_fd)) { +                log_warn("Reserved fds depleted."); +                pthread_rwlock_unlock(&dt.lock); +                return -EBADF; +        } + +        assert(dt.aes[res_fd].post_sdu == NULL); +        assert(dt.aes[res_fd].ae == NULL); + +        dt.aes[res_fd].post_sdu = func; +        dt.aes[res_fd].ae = ae; + +        pthread_rwlock_unlock(&dt.lock); + +        return res_fd; +} +  int dt_write_sdu(uint64_t             dst_addr,                   qoscube_t            qc,                   int                  np1_fd, diff --git a/src/ipcpd/normal/dt.h b/src/ipcpd/normal/dt.h index 52760154..0e1a8cc3 100644 --- a/src/ipcpd/normal/dt.h +++ b/src/ipcpd/normal/dt.h @@ -37,9 +37,12 @@ int  dt_start(void);  void dt_stop(void); +int  dt_reg_ae(void * ae, +               int (* func)(void * ae, struct shm_du_buff * sdb)); +  int  dt_write_sdu(uint64_t             dst_addr,                    qoscube_t            qc, -                  int                  np1_fd, +                  int                  res_fd,                    struct shm_du_buff * sdb);  #endif /* OUROBOROS_IPCPD_NORMAL_DT_H */ diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index d6c36a17..d7073617 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -50,6 +50,7 @@ struct {          pthread_rwlock_t   flows_lock;          int                r_fd[AP_MAX_FLOWS];          uint64_t           r_addr[AP_MAX_FLOWS]; +        int                fd;          struct sdu_sched * sdu_sched;  } fa; @@ -78,6 +79,104 @@ static void destroy_conn(int fd)          fa.r_addr[fd] = INVALID_ADDR;  } +static int fa_post_sdu(void *               ae, +                       struct shm_du_buff * sdb) +{ +        struct timespec    ts  = {0, TIMEOUT * 1000}; +        int                fd; +        flow_alloc_msg_t * msg; + +        (void) ae; + +        assert(ae == &fa); +        assert(sdb); + +        /* Depending on the message call the function in ipcp-dev.h */ + +        msg = flow_alloc_msg__unpack(NULL, +                                     shm_du_buff_tail(sdb) - +                                     shm_du_buff_head(sdb), +                                     shm_du_buff_head(sdb)); +        if (msg == NULL) { +                log_err("Failed to unpack flow alloc message."); +                return -1; +        } + +        switch (msg->code) { +        case FLOW_ALLOC_CODE__FLOW_REQ: +                pthread_mutex_lock(&ipcpi.alloc_lock); + +                if (!msg->has_hash || !msg->has_s_fd || !msg->has_s_addr) { +                        log_err("Bad flow request."); +                        pthread_mutex_unlock(&ipcpi.alloc_lock); +                        flow_alloc_msg__free_unpacked(msg, NULL); +                        return -1; +                } + +                while (ipcpi.alloc_id != -1 && +                       ipcp_get_state() == IPCP_OPERATIONAL) +                        pthread_cond_timedwait(&ipcpi.alloc_cond, +                                               &ipcpi.alloc_lock, +                                               &ts); + +                if (ipcp_get_state() != IPCP_OPERATIONAL) { +                        log_dbg("Won't allocate over non-operational IPCP."); +                        pthread_mutex_unlock(&ipcpi.alloc_lock); +                        flow_alloc_msg__free_unpacked(msg, NULL); +                        return -1; +                } + +                assert(ipcpi.alloc_id == -1); + +                fd = ipcp_flow_req_arr(getpid(), +                                       msg->hash.data, +                                       ipcp_dir_hash_len(), +                                       msg->qc); +                if (fd < 0) { +                        pthread_mutex_unlock(&ipcpi.alloc_lock); +                        flow_alloc_msg__free_unpacked(msg, NULL); +                        log_err("Failed to get fd for flow."); +                        return -1; +                } + +                pthread_rwlock_wrlock(&fa.flows_lock); + +                fa.r_fd[fd]   = msg->s_fd; +                fa.r_addr[fd] = msg->s_addr; + +                pthread_rwlock_unlock(&fa.flows_lock); + +                ipcpi.alloc_id = fd; +                pthread_cond_broadcast(&ipcpi.alloc_cond); + +                pthread_mutex_unlock(&ipcpi.alloc_lock); + +                break; +        case FLOW_ALLOC_CODE__FLOW_REPLY: +                pthread_rwlock_wrlock(&fa.flows_lock); + +                ipcp_flow_alloc_reply(msg->r_fd, msg->response); + +                if (msg->response < 0) +                        destroy_conn(msg->r_fd); +                else +                        sdu_sched_add(fa.sdu_sched, fa.r_fd[msg->r_fd]); + +                pthread_rwlock_unlock(&fa.flows_lock); + +                break; +        default: +                log_err("Got an unknown flow allocation message."); +                flow_alloc_msg__free_unpacked(msg, NULL); +                return -1; +        } + +        flow_alloc_msg__free_unpacked(msg, NULL); +        ipcp_sdb_release(sdb); + +        return 0; +} +  int fa_init(void)  {          int i; @@ -88,6 +187,8 @@ int fa_init(void)          if (pthread_rwlock_init(&fa.flows_lock, NULL))                  return -1; +        fa.fd = dt_reg_ae(&fa, &fa_post_sdu); +          return 0;  } @@ -191,7 +292,7 @@ int fa_alloc(int             fd,          if (sdb == NULL)                  return -1; -        if (dt_write_sdu(addr, qc, FD_FA, sdb)) { +        if (dt_write_sdu(addr, qc, fa.fd, sdb)) {                  ipcp_sdb_release(sdb);                  return -1;          } @@ -287,97 +388,3 @@ int fa_dealloc(int fd)          return 0;  } - -int fa_post_sdu(struct shm_du_buff * sdb) -{ -        struct timespec    ts  = {0, TIMEOUT * 1000}; -        int                fd; -        flow_alloc_msg_t * msg; - -        assert(sdb); - -        /* Depending on the message call the function in ipcp-dev.h */ - -        msg = flow_alloc_msg__unpack(NULL, -                                     shm_du_buff_tail(sdb) - -                                     shm_du_buff_head(sdb), -                                     shm_du_buff_head(sdb)); -        if (msg == NULL) { -                log_err("Failed to unpack flow alloc message"); -                return -1; -        } - -        switch (msg->code) { -        case FLOW_ALLOC_CODE__FLOW_REQ: -                pthread_mutex_lock(&ipcpi.alloc_lock); - -                if (!msg->has_hash || !msg->has_s_fd || !msg->has_s_addr) { -                        log_err("Bad flow request."); -                        pthread_mutex_unlock(&ipcpi.alloc_lock); -                        flow_alloc_msg__free_unpacked(msg, NULL); -                        return -1; -                } - -                while (ipcpi.alloc_id != -1 && -                       ipcp_get_state() == IPCP_OPERATIONAL) -                        pthread_cond_timedwait(&ipcpi.alloc_cond, -                                               &ipcpi.alloc_lock, -                                               &ts); - -                if (ipcp_get_state() != IPCP_OPERATIONAL) { -                        log_dbg("Won't allocate over non-operational IPCP."); -                        pthread_mutex_unlock(&ipcpi.alloc_lock); -                        flow_alloc_msg__free_unpacked(msg, NULL); -                        return -1; -                } - -                assert(ipcpi.alloc_id == -1); - -                fd = ipcp_flow_req_arr(getpid(), -                                       msg->hash.data, -                                       ipcp_dir_hash_len(), -                                       msg->qc); -                if (fd < 0) { -                        pthread_mutex_unlock(&ipcpi.alloc_lock); -                        flow_alloc_msg__free_unpacked(msg, NULL); -                        log_err("Failed to get fd for flow."); -                        return -1; -                } - -                pthread_rwlock_wrlock(&fa.flows_lock); - -                fa.r_fd[fd]   = msg->s_fd; -                fa.r_addr[fd] = msg->s_addr; - -                pthread_rwlock_unlock(&fa.flows_lock); - -                ipcpi.alloc_id = fd; -                pthread_cond_broadcast(&ipcpi.alloc_cond); - -                pthread_mutex_unlock(&ipcpi.alloc_lock); - -                break; -        case FLOW_ALLOC_CODE__FLOW_REPLY: -                pthread_rwlock_wrlock(&fa.flows_lock); - -                ipcp_flow_alloc_reply(msg->r_fd, msg->response); - -                if (msg->response < 0) -                        destroy_conn(msg->r_fd); -                else -                        sdu_sched_add(fa.sdu_sched, fa.r_fd[msg->r_fd]); - -                pthread_rwlock_unlock(&fa.flows_lock); - -                break; -        default: -                log_err("Got an unknown flow allocation message."); -                flow_alloc_msg__free_unpacked(msg, NULL); -                return -1; -        } - -        flow_alloc_msg__free_unpacked(msg, NULL); -        ipcp_sdb_release(sdb); - -        return 0; -} diff --git a/src/ipcpd/normal/fa.h b/src/ipcpd/normal/fa.h index 264d45ea..a77dc723 100644 --- a/src/ipcpd/normal/fa.h +++ b/src/ipcpd/normal/fa.h @@ -43,6 +43,4 @@ int  fa_alloc_resp(int fd,  int  fa_dealloc(int fd); -int  fa_post_sdu(struct shm_du_buff * sdb); -  #endif /* OUROBOROS_IPCPD_NORMAL_FA_H */ | 
