diff options
| author | dimitri staessens <dimitri.staessens@ugent.be> | 2017-05-24 09:54:12 +0000 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@ugent.be> | 2017-05-24 09:54:12 +0000 | 
| commit | be49633c7069c2a2273add8c1dd0425a8f25dd1e (patch) | |
| tree | 4647e7be5bf83ca853b32b35d926cbc0450ddb48 /src/ipcpd | |
| parent | 6cbe31eb5a14eaa993200fe7fe5710aab4b999f3 (diff) | |
| parent | 3d887b172c37c4418c6173048e6a317eb0c36e57 (diff) | |
| download | ouroboros-be49633c7069c2a2273add8c1dd0425a8f25dd1e.tar.gz ouroboros-be49633c7069c2a2273add8c1dd0425a8f25dd1e.zip | |
Merged in dstaesse/ouroboros/be-normal-dt-reg (pull request #510)
ipcpd: Allow registering protocol machines with DT
Diffstat (limited to 'src/ipcpd')
| -rw-r--r-- | src/ipcpd/normal/dt.c | 91 | ||||
| -rw-r--r-- | src/ipcpd/normal/dt.h | 5 | ||||
| -rw-r--r-- | src/ipcpd/normal/fa.c | 197 | ||||
| -rw-r--r-- | src/ipcpd/normal/fa.h | 2 | 
4 files changed, 176 insertions, 119 deletions
| diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index b22fb59c..1867c13b 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -23,13 +23,15 @@  #define OUROBOROS_PREFIX "dt-ae"  #include <ouroboros/config.h> +#include <ouroboros/bitmap.h> +#include <ouroboros/errno.h>  #include <ouroboros/logs.h>  #include <ouroboros/rib.h>  #include <ouroboros/dev.h> -#include "dt.h"  #include "connmgr.h"  #include "ipcp.h" +#include "dt.h"  #include "dt_pci.h"  #include "pff.h"  #include "neighbors.h" @@ -47,12 +49,21 @@  #include <inttypes.h>  #include <assert.h> +struct ae_info { +        int    (*post_sdu)(void * ae, struct shm_du_buff * sdb); +        void * ae; +}; +  struct {          struct sdu_sched * sdu_sched;          struct pff *       pff[QOS_CUBE_MAX];          struct routing_i * routing[QOS_CUBE_MAX]; +        struct bmp *       res_fds; +        struct ae_info     aes[AP_RES_FDS]; +        pthread_rwlock_t   lock; +          struct gam *       gam;          struct nbs *       nbs;          struct ae *        ae; @@ -120,18 +131,12 @@ static int sdu_handler(int                  fd,                          return 0;                  } -                switch (dt_pci.fd) { -                case FD_FA: -                        if (fa_post_sdu(sdb)) { -                                ipcp_sdb_release(sdb); -                                return -1; -                        } -                        return 0; -                default: -                        log_err("Unknown PDU type received."); +                if (dt.aes[dt_pci.fd].post_sdu(dt.aes[dt_pci.fd].ae, sdb)) {                          ipcp_sdb_release(sdb);                          return -1;                  } + +                return 0;          }          /* silence compiler */ @@ -167,24 +172,24 @@ int dt_init(void)          dt.ae = connmgr_ae_create(info);          if (dt.ae == NULL) {                  log_err("Failed to create AE struct."); -                return -1; +                goto fail_connmgr;          }          dt.nbs = nbs_create();          if (dt.nbs == NULL) {                  log_err("Failed to create neighbors struct."); -                goto fail_connmgr; +                goto fail_nbs;          }          dt.nb_notifier.notify_call = dt_neighbor_event;          if (nbs_reg_notifier(dt.nbs, &dt.nb_notifier)) {                  log_err("Failed to register notifier."); -                goto fail_nbs; +                goto fail_nbs_notifier;          }          if (routing_init(pr, dt.nbs)) {                  log_err("Failed to init routing."); -                goto fail_nbs_notifier; +                goto fail_routing;          }          for (i = 0; i < QOS_CUBE_MAX; ++i) { @@ -192,7 +197,7 @@ int dt_init(void)                  if (dt.pff[i] == NULL) {                          for (j = 0; j < i; ++j)                                  pff_destroy(dt.pff[j]); -                        goto fail_routing; +                        goto fail_pff;                  }          } @@ -201,22 +206,39 @@ int dt_init(void)                  if (dt.routing[i] == NULL) {                          for (j = 0; j < i; ++j)                                  routing_i_destroy(dt.routing[j]); -                        goto fail_pff; +                        goto fail_routing_i;                  }          } +        if (pthread_rwlock_init(&dt.lock, NULL)) { +                log_err("Failed to init rwlock."); +                goto fail_rwlock_init; +        } + +        dt.res_fds = bmp_create(AP_RES_FDS, 0); +        if (dt.res_fds == NULL) +                goto fail_res_fds; +          return 0; - fail_pff: + + fail_res_fds: +        pthread_rwlock_destroy(&dt.lock); + fail_rwlock_init: +        for (j = 0; j < QOS_CUBE_MAX; ++j) +                routing_i_destroy(dt.routing[j]); + fail_routing_i:          for (i = 0; i < QOS_CUBE_MAX; ++i)                  pff_destroy(dt.pff[i]); - fail_routing: + fail_pff:          routing_fini(); - fail_nbs_notifier: + fail_routing:          nbs_unreg_notifier(dt.nbs, &dt.nb_notifier); - fail_nbs: + fail_nbs_notifier:          nbs_destroy(dt.nbs); - fail_connmgr: + fail_nbs:          connmgr_ae_destroy(dt.ae); + fail_connmgr: +        dt_pci_fini();          return -1;  } @@ -272,6 +294,33 @@ void dt_stop(void)          sdu_sched_destroy(dt.sdu_sched);  } +int dt_reg_ae(void * ae, +              int (* func)(void * func, struct shm_du_buff *)) +{ +        int res_fd; + +        assert(func); + +        pthread_rwlock_wrlock(&dt.lock); + +        res_fd = bmp_allocate(dt.res_fds); +        if (!bmp_is_id_valid(dt.res_fds, res_fd)) { +                log_warn("Reserved fds depleted."); +                pthread_rwlock_unlock(&dt.lock); +                return -EBADF; +        } + +        assert(dt.aes[res_fd].post_sdu == NULL); +        assert(dt.aes[res_fd].ae == NULL); + +        dt.aes[res_fd].post_sdu = func; +        dt.aes[res_fd].ae = ae; + +        pthread_rwlock_unlock(&dt.lock); + +        return res_fd; +} +  int dt_write_sdu(uint64_t             dst_addr,                   qoscube_t            qc,                   int                  np1_fd, diff --git a/src/ipcpd/normal/dt.h b/src/ipcpd/normal/dt.h index 52760154..0e1a8cc3 100644 --- a/src/ipcpd/normal/dt.h +++ b/src/ipcpd/normal/dt.h @@ -37,9 +37,12 @@ int  dt_start(void);  void dt_stop(void); +int  dt_reg_ae(void * ae, +               int (* func)(void * ae, struct shm_du_buff * sdb)); +  int  dt_write_sdu(uint64_t             dst_addr,                    qoscube_t            qc, -                  int                  np1_fd, +                  int                  res_fd,                    struct shm_du_buff * sdb);  #endif /* OUROBOROS_IPCPD_NORMAL_DT_H */ diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c index d6c36a17..d7073617 100644 --- a/src/ipcpd/normal/fa.c +++ b/src/ipcpd/normal/fa.c @@ -50,6 +50,7 @@ struct {          pthread_rwlock_t   flows_lock;          int                r_fd[AP_MAX_FLOWS];          uint64_t           r_addr[AP_MAX_FLOWS]; +        int                fd;          struct sdu_sched * sdu_sched;  } fa; @@ -78,6 +79,104 @@ static void destroy_conn(int fd)          fa.r_addr[fd] = INVALID_ADDR;  } +static int fa_post_sdu(void *               ae, +                       struct shm_du_buff * sdb) +{ +        struct timespec    ts  = {0, TIMEOUT * 1000}; +        int                fd; +        flow_alloc_msg_t * msg; + +        (void) ae; + +        assert(ae == &fa); +        assert(sdb); + +        /* Depending on the message call the function in ipcp-dev.h */ + +        msg = flow_alloc_msg__unpack(NULL, +                                     shm_du_buff_tail(sdb) - +                                     shm_du_buff_head(sdb), +                                     shm_du_buff_head(sdb)); +        if (msg == NULL) { +                log_err("Failed to unpack flow alloc message."); +                return -1; +        } + +        switch (msg->code) { +        case FLOW_ALLOC_CODE__FLOW_REQ: +                pthread_mutex_lock(&ipcpi.alloc_lock); + +                if (!msg->has_hash || !msg->has_s_fd || !msg->has_s_addr) { +                        log_err("Bad flow request."); +                        pthread_mutex_unlock(&ipcpi.alloc_lock); +                        flow_alloc_msg__free_unpacked(msg, NULL); +                        return -1; +                } + +                while (ipcpi.alloc_id != -1 && +                       ipcp_get_state() == IPCP_OPERATIONAL) +                        pthread_cond_timedwait(&ipcpi.alloc_cond, +                                               &ipcpi.alloc_lock, +                                               &ts); + +                if (ipcp_get_state() != IPCP_OPERATIONAL) { +                        log_dbg("Won't allocate over non-operational IPCP."); +                        pthread_mutex_unlock(&ipcpi.alloc_lock); +                        flow_alloc_msg__free_unpacked(msg, NULL); +                        return -1; +                } + +                assert(ipcpi.alloc_id == -1); + +                fd = ipcp_flow_req_arr(getpid(), +                                       msg->hash.data, +                                       ipcp_dir_hash_len(), +                                       msg->qc); +                if (fd < 0) { +                        pthread_mutex_unlock(&ipcpi.alloc_lock); +                        flow_alloc_msg__free_unpacked(msg, NULL); +                        log_err("Failed to get fd for flow."); +                        return -1; +                } + +                pthread_rwlock_wrlock(&fa.flows_lock); + +                fa.r_fd[fd]   = msg->s_fd; +                fa.r_addr[fd] = msg->s_addr; + +                pthread_rwlock_unlock(&fa.flows_lock); + +                ipcpi.alloc_id = fd; +                pthread_cond_broadcast(&ipcpi.alloc_cond); + +                pthread_mutex_unlock(&ipcpi.alloc_lock); + +                break; +        case FLOW_ALLOC_CODE__FLOW_REPLY: +                pthread_rwlock_wrlock(&fa.flows_lock); + +                ipcp_flow_alloc_reply(msg->r_fd, msg->response); + +                if (msg->response < 0) +                        destroy_conn(msg->r_fd); +                else +                        sdu_sched_add(fa.sdu_sched, fa.r_fd[msg->r_fd]); + +                pthread_rwlock_unlock(&fa.flows_lock); + +                break; +        default: +                log_err("Got an unknown flow allocation message."); +                flow_alloc_msg__free_unpacked(msg, NULL); +                return -1; +        } + +        flow_alloc_msg__free_unpacked(msg, NULL); +        ipcp_sdb_release(sdb); + +        return 0; +} +  int fa_init(void)  {          int i; @@ -88,6 +187,8 @@ int fa_init(void)          if (pthread_rwlock_init(&fa.flows_lock, NULL))                  return -1; +        fa.fd = dt_reg_ae(&fa, &fa_post_sdu); +          return 0;  } @@ -191,7 +292,7 @@ int fa_alloc(int             fd,          if (sdb == NULL)                  return -1; -        if (dt_write_sdu(addr, qc, FD_FA, sdb)) { +        if (dt_write_sdu(addr, qc, fa.fd, sdb)) {                  ipcp_sdb_release(sdb);                  return -1;          } @@ -287,97 +388,3 @@ int fa_dealloc(int fd)          return 0;  } - -int fa_post_sdu(struct shm_du_buff * sdb) -{ -        struct timespec    ts  = {0, TIMEOUT * 1000}; -        int                fd; -        flow_alloc_msg_t * msg; - -        assert(sdb); - -        /* Depending on the message call the function in ipcp-dev.h */ - -        msg = flow_alloc_msg__unpack(NULL, -                                     shm_du_buff_tail(sdb) - -                                     shm_du_buff_head(sdb), -                                     shm_du_buff_head(sdb)); -        if (msg == NULL) { -                log_err("Failed to unpack flow alloc message"); -                return -1; -        } - -        switch (msg->code) { -        case FLOW_ALLOC_CODE__FLOW_REQ: -                pthread_mutex_lock(&ipcpi.alloc_lock); - -                if (!msg->has_hash || !msg->has_s_fd || !msg->has_s_addr) { -                        log_err("Bad flow request."); -                        pthread_mutex_unlock(&ipcpi.alloc_lock); -                        flow_alloc_msg__free_unpacked(msg, NULL); -                        return -1; -                } - -                while (ipcpi.alloc_id != -1 && -                       ipcp_get_state() == IPCP_OPERATIONAL) -                        pthread_cond_timedwait(&ipcpi.alloc_cond, -                                               &ipcpi.alloc_lock, -                                               &ts); - -                if (ipcp_get_state() != IPCP_OPERATIONAL) { -                        log_dbg("Won't allocate over non-operational IPCP."); -                        pthread_mutex_unlock(&ipcpi.alloc_lock); -                        flow_alloc_msg__free_unpacked(msg, NULL); -                        return -1; -                } - -                assert(ipcpi.alloc_id == -1); - -                fd = ipcp_flow_req_arr(getpid(), -                                       msg->hash.data, -                                       ipcp_dir_hash_len(), -                                       msg->qc); -                if (fd < 0) { -                        pthread_mutex_unlock(&ipcpi.alloc_lock); -                        flow_alloc_msg__free_unpacked(msg, NULL); -                        log_err("Failed to get fd for flow."); -                        return -1; -                } - -                pthread_rwlock_wrlock(&fa.flows_lock); - -                fa.r_fd[fd]   = msg->s_fd; -                fa.r_addr[fd] = msg->s_addr; - -                pthread_rwlock_unlock(&fa.flows_lock); - -                ipcpi.alloc_id = fd; -                pthread_cond_broadcast(&ipcpi.alloc_cond); - -                pthread_mutex_unlock(&ipcpi.alloc_lock); - -                break; -        case FLOW_ALLOC_CODE__FLOW_REPLY: -                pthread_rwlock_wrlock(&fa.flows_lock); - -                ipcp_flow_alloc_reply(msg->r_fd, msg->response); - -                if (msg->response < 0) -                        destroy_conn(msg->r_fd); -                else -                        sdu_sched_add(fa.sdu_sched, fa.r_fd[msg->r_fd]); - -                pthread_rwlock_unlock(&fa.flows_lock); - -                break; -        default: -                log_err("Got an unknown flow allocation message."); -                flow_alloc_msg__free_unpacked(msg, NULL); -                return -1; -        } - -        flow_alloc_msg__free_unpacked(msg, NULL); -        ipcp_sdb_release(sdb); - -        return 0; -} diff --git a/src/ipcpd/normal/fa.h b/src/ipcpd/normal/fa.h index 264d45ea..a77dc723 100644 --- a/src/ipcpd/normal/fa.h +++ b/src/ipcpd/normal/fa.h @@ -43,6 +43,4 @@ int  fa_alloc_resp(int fd,  int  fa_dealloc(int fd); -int  fa_post_sdu(struct shm_du_buff * sdb); -  #endif /* OUROBOROS_IPCPD_NORMAL_FA_H */ | 
