diff options
| author | Sander Vrijders <sander.vrijders@ugent.be> | 2017-04-19 18:36:59 +0200 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@ugent.be> | 2017-04-19 18:43:45 +0200 | 
| commit | fa70f932b4a6ad5849e1fd68906157619c09341f (patch) | |
| tree | 8a32bc6bba24b60177c119f3e6ce16c2ae1fea72 /src/ipcpd | |
| parent | bfc295d012efe5117734a2d6799c90af61102a68 (diff) | |
| download | ouroboros-fa70f932b4a6ad5849e1fd68906157619c09341f.tar.gz ouroboros-fa70f932b4a6ad5849e1fd68906157619c09341f.zip | |
ipcpd: normal: Extract SDU scheduling component
This extracts the SDU scheduling component out of the Flow Manager
since the functionality was duplicated. For both the N-1 and N+1 flow
sets an SDU scheduling component is now created.
Diffstat (limited to 'src/ipcpd')
| -rw-r--r-- | src/ipcpd/normal/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/ipcpd/normal/fmgr.c | 193 | ||||
| -rw-r--r-- | src/ipcpd/normal/sdu_sched.c | 131 | ||||
| -rw-r--r-- | src/ipcpd/normal/sdu_sched.h | 38 | 
4 files changed, 236 insertions, 127 deletions
| diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 69615d0c..2045b8df 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -32,6 +32,7 @@ set(SOURCE_FILES    pff.c    ribmgr.c    routing.c +  sdu_sched.c    shm_pci.c    # Add policies last    pol/complete.c diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index ba36812f..d055b311 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -42,6 +42,7 @@  #include "neighbors.h"  #include "gam.h"  #include "routing.h" +#include "sdu_sched.h"  #include <stdlib.h>  #include <stdbool.h> @@ -55,18 +56,15 @@ typedef FlowAllocMsg flow_alloc_msg_t;  #define FD_UPDATE_TIMEOUT 10000 /* nanoseconds */  struct { -        flow_set_t *       np1_set[QOS_CUBE_MAX]; -        fqueue_t *         np1_fqs[QOS_CUBE_MAX];          pthread_rwlock_t   np1_flows_lock; -          cep_id_t           np1_fd_to_cep_id[AP_MAX_FLOWS];          int                np1_cep_id_to_fd[IPCPD_MAX_CONNS]; -        pthread_t          np1_sdu_reader; +        flow_set_t *       np1_set[QOS_CUBE_MAX]; +        struct sdu_sched * np1_sdu_sched;          flow_set_t *       nm1_set[QOS_CUBE_MAX]; -        fqueue_t *         nm1_fqs[QOS_CUBE_MAX]; -        pthread_t          nm1_sdu_reader; +        struct sdu_sched * nm1_sdu_sched;          struct pff *       pff[QOS_CUBE_MAX];          struct routing_i * routing[QOS_CUBE_MAX]; @@ -102,125 +100,70 @@ static int fmgr_neighbor_event(enum nb_event event,          return 0;  } -static void * fmgr_np1_sdu_reader(void * o) +static int np1_sdu_handler(int                  fd, +                           qoscube_t            qc, +                           struct shm_du_buff * sdb)  { -        struct shm_du_buff * sdb; -        struct timespec      timeout = {0, FD_UPDATE_TIMEOUT}; -        int                  fd; -        int                  i = 0; -        int                  ret; - -        (void) o; - -        while (true) { -                /* FIXME: replace with scheduling policy call */ -                i = (i + 1) % QOS_CUBE_MAX; - -                ret = flow_event_wait(fmgr.np1_set[i], -                                      fmgr.np1_fqs[i], -                                      &timeout); -                if (ret == -ETIMEDOUT) -                        continue; - -                if (ret < 0) { -                        log_warn("Event error: %d.", ret); -                        continue; -                } - -                while ((fd = fqueue_next(fmgr.np1_fqs[i])) >= 0) { -                        if (ipcp_flow_read(fd, &sdb)) { -                                log_warn("Failed to read SDU from fd %d.", fd); -                                continue; -                        } +        (void) qc; -                        pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - -                        if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) { -                                pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                                ipcp_flow_del(sdb); -                                log_warn("Failed to hand SDU to FRCT."); -                                continue; -                        } +        pthread_rwlock_rdlock(&fmgr.np1_flows_lock); -                        pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                } +        if (frct_i_write_sdu(fmgr.np1_fd_to_cep_id[fd], sdb)) { +                pthread_rwlock_unlock(&fmgr.np1_flows_lock); +                ipcp_flow_del(sdb); +                log_warn("Failed to hand SDU to FRCT."); +                return -1;          } -        return (void *) 0; +        pthread_rwlock_unlock(&fmgr.np1_flows_lock); + +        return 0;  } -void * fmgr_nm1_sdu_reader(void * o) +static int nm1_sdu_handler(int                  fd, +                           qoscube_t            qc, +                           struct shm_du_buff * sdb)  { -        struct timespec      timeout = {0, FD_UPDATE_TIMEOUT}; -        struct shm_du_buff * sdb; -        struct pci           pci; -        int                  fd; -        int                  i = 0; -        int                  ret; - -        (void) o; +        struct pci pci;          memset(&pci, 0, sizeof(pci)); -        while (true) { -                /* FIXME: replace with scheduling policy call */ -                i = (i + 1) % QOS_CUBE_MAX; +        shm_pci_des(sdb, &pci); + +        if (pci.dst_addr != ipcpi.dt_addr) { +                if (pci.ttl == 0) { +                        log_dbg("TTL was zero."); +                        ipcp_flow_del(sdb); +                        return 0; +                } + +                pff_lock(fmgr.pff[qc]); + +                fd = pff_nhop(fmgr.pff[qc], pci.dst_addr); +                if (fd < 0) { +                        pff_unlock(fmgr.pff[qc]); +                        log_err("No next hop for %" PRIu64, pci.dst_addr); +                        ipcp_flow_del(sdb); +                        return -1; +                } -                ret = flow_event_wait(fmgr.nm1_set[i], -                                      fmgr.nm1_fqs[i], -                                      &timeout); -                if (ret == -ETIMEDOUT) -                        continue; +                pff_unlock(fmgr.pff[qc]); -                if (ret < 0) { -                        log_err("Event error: %d.", ret); -                        continue; +                if (ipcp_flow_write(fd, sdb)) { +                        log_err("Failed to write SDU to fd %d.", fd); +                        ipcp_flow_del(sdb); +                        return -1;                  } +        } else { +                shm_pci_shrink(sdb); -                while ((fd = fqueue_next(fmgr.nm1_fqs[i])) >= 0) { -                        if (ipcp_flow_read(fd, &sdb)) { -                                log_err("Failed to read SDU from fd %d.", fd); -                                continue; -                        } - -                        shm_pci_des(sdb, &pci); - -                        if (pci.dst_addr != ipcpi.dt_addr) { -                                if (pci.ttl == 0) { -                                        log_dbg("TTL was zero."); -                                        ipcp_flow_del(sdb); -                                        continue; -                                } - -                                pff_lock(fmgr.pff[i]); -                                fd = pff_nhop(fmgr.pff[i], pci.dst_addr); -                                if (fd < 0) { -                                        pff_unlock(fmgr.pff[i]); -                                        log_err("No next hop for %" PRIu64, -                                                pci.dst_addr); -                                        ipcp_flow_del(sdb); -                                        continue; -                                } -                                pff_unlock(fmgr.pff[i]); - -                                if (ipcp_flow_write(fd, sdb)) { -                                        log_err("Failed to write SDU to fd %d.", -                                                fd); -                                        ipcp_flow_del(sdb); -                                        continue; -                                } -                        } else { -                                shm_pci_shrink(sdb); - -                                if (frct_nm1_post_sdu(&pci, sdb)) { -                                        log_err("Failed to hand PDU to FRCT."); -                                        continue; -                                } -                        } +                if (frct_nm1_post_sdu(&pci, sdb)) { +                        log_err("Failed to hand PDU to FRCT."); +                        return -1;                  }          } -        return (void *) 0; +        return 0;  }  static void fmgr_destroy_flows(void) @@ -230,8 +173,6 @@ static void fmgr_destroy_flows(void)          for (i = 0; i < QOS_CUBE_MAX; ++i) {                  flow_set_destroy(fmgr.nm1_set[i]);                  flow_set_destroy(fmgr.np1_set[i]); -                fqueue_destroy(fmgr.nm1_fqs[i]); -                fqueue_destroy(fmgr.np1_fqs[i]);          }  } @@ -270,23 +211,11 @@ int fmgr_init(void)                          return -1;                  } -                fmgr.np1_fqs[i] = fqueue_create(); -                if (fmgr.np1_fqs[i] == NULL) { -                        fmgr_destroy_flows(); -                        return -1; -                } -                  fmgr.nm1_set[i] = flow_set_create();                  if (fmgr.nm1_set[i] == NULL) {                          fmgr_destroy_flows();                          return -1;                  } - -                fmgr.nm1_fqs[i] = fqueue_create(); -                if (fmgr.nm1_fqs[i] == NULL) { -                        fmgr_destroy_flows(); -                        return -1; -                }          }          if (shm_pci_init()) { @@ -410,19 +339,29 @@ int fmgr_start(void)                  return -1;          } -        pthread_create(&fmgr.np1_sdu_reader, NULL, fmgr_np1_sdu_reader, NULL); -        pthread_create(&fmgr.nm1_sdu_reader, NULL, fmgr_nm1_sdu_reader, NULL); +        fmgr.nm1_sdu_sched = sdu_sched_create(fmgr.nm1_set, nm1_sdu_handler); +        if (fmgr.nm1_sdu_sched == NULL) { +                log_err("Failed to create N-1 SDU scheduler."); +                gam_destroy(fmgr.gam); +                return -1; +        } + +        fmgr.np1_sdu_sched = sdu_sched_create(fmgr.np1_set, np1_sdu_handler); +        if (fmgr.np1_sdu_sched == NULL) { +                log_err("Failed to create N+1 SDU scheduler."); +                sdu_sched_destroy(fmgr.nm1_sdu_sched); +                gam_destroy(fmgr.gam); +                return -1; +        }          return 0;  }  void fmgr_stop(void)  { -        pthread_cancel(fmgr.np1_sdu_reader); -        pthread_cancel(fmgr.nm1_sdu_reader); +        sdu_sched_destroy(fmgr.np1_sdu_sched); -        pthread_join(fmgr.np1_sdu_reader, NULL); -        pthread_join(fmgr.nm1_sdu_reader, NULL); +        sdu_sched_destroy(fmgr.nm1_sdu_sched);          gam_destroy(fmgr.gam);  } diff --git a/src/ipcpd/normal/sdu_sched.c b/src/ipcpd/normal/sdu_sched.c new file mode 100644 index 00000000..14fc672f --- /dev/null +++ b/src/ipcpd/normal/sdu_sched.c @@ -0,0 +1,131 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * SDU scheduler component + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#define OUROBOROS_PREFIX "sdu-scheduler" + +#include <ouroboros/config.h> +#include <ouroboros/logs.h> +#include <ouroboros/errno.h> + +#include "sdu_sched.h" + +#include <stdbool.h> +#include <assert.h> +#include <stdlib.h> + +#define FD_UPDATE_TIMEOUT 10000 /* nanoseconds */ + +struct sdu_sched { +        flow_set_t * set[QOS_CUBE_MAX]; +        fqueue_t *   fqs[QOS_CUBE_MAX]; +        next_sdu_t   callback; +        pthread_t    sdu_reader; +}; + +static void * sdu_reader(void * o) +{ +        struct sdu_sched *   sched; +        struct shm_du_buff * sdb; +        struct timespec      timeout = {0, FD_UPDATE_TIMEOUT}; +        int                  fd; +        int                  i = 0; +        int                  ret; + +        sched = (struct sdu_sched *) o; + +        while (true) { +                /* FIXME: replace with scheduling policy call */ +                i = (i + 1) % QOS_CUBE_MAX; + +                ret = flow_event_wait(sched->set[i], sched->fqs[i], &timeout); +                if (ret == -ETIMEDOUT) +                        continue; + +                if (ret < 0) { +                        log_warn("Event error: %d.", ret); +                        continue; +                } + +                while ((fd = fqueue_next(sched->fqs[i])) >= 0) { +                        if (ipcp_flow_read(fd, &sdb)) { +                                log_warn("Failed to read SDU from fd %d.", fd); +                                continue; +                        } + +                        if (sched->callback(fd, i, sdb)) { +                                log_warn("Callback reported an error."); +                                continue; +                        } +                } +        } + +        return (void *) 0; +} + +struct sdu_sched * sdu_sched_create(flow_set_t * set[QOS_CUBE_MAX], +                                    next_sdu_t   callback) +{ +        struct sdu_sched * sdu_sched; +        int                i; +        int                j; + +        sdu_sched = malloc(sizeof(*sdu_sched)); +        if (sdu_sched == NULL) +                return NULL; + +        sdu_sched->callback = callback; + +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                sdu_sched->set[i] = set[i]; + +                sdu_sched->fqs[i] = fqueue_create(); +                if (sdu_sched->fqs[i] == NULL) { +                        for (j = i; j >= 0; --j) +                                fqueue_destroy(sdu_sched->fqs[j]); +                        free(sdu_sched); +                        return NULL; +                } +        } + +        pthread_create(&sdu_sched->sdu_reader, +                       NULL, +                       sdu_reader, +                       (void *) sdu_sched); + +        return sdu_sched; +} + +void sdu_sched_destroy(struct sdu_sched * sdu_sched) +{ +        int i; + +        assert(sdu_sched); + +        pthread_cancel(sdu_sched->sdu_reader); + +        pthread_join(sdu_sched->sdu_reader, NULL); + +        for (i = 0; i < QOS_CUBE_MAX; ++i) +                fqueue_destroy(sdu_sched->fqs[i]); + +        free(sdu_sched); +} diff --git a/src/ipcpd/normal/sdu_sched.h b/src/ipcpd/normal/sdu_sched.h new file mode 100644 index 00000000..1a22b041 --- /dev/null +++ b/src/ipcpd/normal/sdu_sched.h @@ -0,0 +1,38 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2017 + * + * SDU scheduler component + * + *    Dimitri Staessens <dimitri.staessens@ugent.be> + *    Sander Vrijders   <sander.vrijders@ugent.be> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H +#define OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H + +#include <ouroboros/ipcp-dev.h> +#include <ouroboros/fqueue.h> + +typedef int (* next_sdu_t)(int                  fd, +                           qoscube_t            qc, +                           struct shm_du_buff * sdb); + +struct sdu_sched * sdu_sched_create(flow_set_t * set[QOS_CUBE_MAX], +                                    next_sdu_t   callback); + +void               sdu_sched_destroy(struct sdu_sched * sdu_sched); + +#endif /* OUROBOROS_IPCPD_NORMAL_SDU_SCHED_H */ | 
