From 3be4baa7e19133f0c41188bb1c2ee6ae3a82d470 Mon Sep 17 00:00:00 2001 From: Marco Capitani Date: Thu, 12 Apr 2018 11:54:47 +0200 Subject: model & irati: add QOS cubes Fixes #45 Only implemented in IRATI, other prototypes warn and ignore. --- rumba/elements/topology.py | 39 ++++++++ rumba/prototypes/irati.py | 18 +++- rumba/prototypes/irati_templates.py | 188 ++++++++++++++++++++++++------------ rumba/prototypes/ouroboros.py | 5 + rumba/prototypes/rlite.py | 5 + 5 files changed, 190 insertions(+), 65 deletions(-) diff --git a/rumba/elements/topology.py b/rumba/elements/topology.py index 24dcfa2..0ff1957 100644 --- a/rumba/elements/topology.py +++ b/rumba/elements/topology.py @@ -212,6 +212,8 @@ class NormalDIF(DIF): if policy is None: policy = Policy(self) self.policy = policy + self.qos_cubes = [] + self._last_cube_id = 0 def add_policy(self, comp, pol, **params): """ @@ -243,6 +245,43 @@ class NormalDIF(DIF): % (comp, pol, params) return s + def add_qos_cube(self, name, **kwargs): + """ + Adds a QoS Cube to this DIF + + :param name: the name to be assigned to the QoS cube + :type name: `str` + :param kwargs: the parameters of the QoS cube (prototype dependent) + """ + self.del_qos_cube(name, strict=False) + c_id = self._last_cube_id + 1 + self._last_cube_id = c_id + kwargs["name"] = name + kwargs["cube_id"] = c_id + self.qos_cubes.append(kwargs) + + def del_qos_cube(self, name, strict=True): + """ + Deletes a QoS cube from this DIF + + :param name: the name of the cube to delete + :type name: `str` + :param strict: if no cube with the provided name exists, + raise an exception if and only if `strict` is `True` + :type strict: `bool` + """ + for i, cube in enumerate(self.qos_cubes): + if cube["name"] == name: + index = i + break + else: # no match + if strict: + raise ValueError("No cube with name %s found in dif %s" + % (name, self.name)) + else: + return + self.qos_cubes.pop(index) + class Distribution(Enum): """ diff --git a/rumba/prototypes/irati.py b/rumba/prototypes/irati.py index 120a72e..37f7ae7 100644 --- a/rumba/prototypes/irati.py +++ b/rumba/prototypes/irati.py @@ -316,9 +316,25 @@ class Experiment(mod.Experiment): ipcp2shim_map.update({ipcp.name: dif for ipcp in dif.ipcps}) elif isinstance(dif, mod.NormalDIF): difconfs[dif.name] = dict() + # Generate base conf + dif_conf = copy.deepcopy(irati_templates.normal_dif_base) + # push qos_cubes + if len(dif.qos_cubes) != 0: + dif_conf["qosCubes"] = [] + for cube in dif.qos_cubes: + dif_conf["qosCubes"].append( + irati_templates.generate_qos_cube(**cube) + ) + # Add basic cubes + unreliable = copy.deepcopy(irati_templates.qos_cube_u_base) + unreliable["id"] = len(dif_conf["qosCubes"]) + 1 + dif_conf["qosCubes"].append(unreliable) + reliable = copy.deepcopy(irati_templates.qos_cube_r_base) + reliable["id"] = len(dif_conf["qosCubes"]) + 1 + dif_conf["qosCubes"].append(reliable) for node in dif.members: difconfs[dif.name][node.name] = copy.deepcopy( - irati_templates.normal_dif_base + dif_conf ) for node in self.nodes: # type: mod.Node diff --git a/rumba/prototypes/irati_templates.py b/rumba/prototypes/irati_templates.py index 877ac21..d5beea5 100644 --- a/rumba/prototypes/irati_templates.py +++ b/rumba/prototypes/irati_templates.py @@ -62,6 +62,129 @@ da_map_base = { } +def generate_qos_cube( + name, + cube_id, + initial_credit=200, + ordered=False, + delay=None, + loss=None, + reliable=False, + data_rxms_nmax=5, + initial_rtx_time=1000 +): + cube = { + "name": name, + "id": cube_id, + "partialDelivery": False, + "orderedDelivery": ordered, + "efcpPolicies": { + "dtpPolicySet": { + "name": "default", + "version": "0" + }, + "initialATimer": 0, + "dtcpPresent": True, + "dtcpConfiguration": { + "dtcpPolicySet": { + "name": "default", + "version": "0" + }, + "rtxControl": False, + "flowControl": True, + "flowControlConfig": { + "rateBased": False, + "windowBased": True, + "windowBasedConfig": { + "maxClosedWindowQueueLength": 10, + "initialCredit": initial_credit + } + } + } + } + } + if delay is not None: + cube["delay"] = delay + if loss is not None: + cube["loss"] = loss + if reliable: + cube["maxAllowableGap"] = 0 + cube["efcpPolicies"]["dtcpConfiguration"]["rtxControl"] = True + cube["efcpPolicies"]["dtcpConfiguration"]["rtxControlConfig"] = { + "dataRxmsNmax": data_rxms_nmax, + "initialRtxTime": initial_rtx_time + } + return cube + + +qos_cube_u_base = { + "name": "unreliablewithflowcontrol", + "id": 1, + "partialDelivery": False, + "orderedDelivery": True, + "efcpPolicies": { + "dtpPolicySet": { + "name": "default", + "version": "0" + }, + "initialATimer": 0, + "dtcpPresent": True, + "dtcpConfiguration": { + "dtcpPolicySet": { + "name": "default", + "version": "0" + }, + "rtxControl": False, + "flowControl": True, + "flowControlConfig": { + "rateBased": False, + "windowBased": True, + "windowBasedConfig": { + "maxClosedWindowQueueLength": 10, + "initialCredit": 200 + } + } + } + } +} + +qos_cube_r_base = { + "name": "reliablewithflowcontrol", + "id": 2, + "partialDelivery": False, + "orderedDelivery": True, + "maxAllowableGap": 0, + "efcpPolicies": { + "dtpPolicySet": { + "name": "default", + "version": "0" + }, + "initialATimer": 0, + "dtcpPresent": True, + "dtcpConfiguration": { + "dtcpPolicySet": { + "name": "default", + "version": "0" + }, + "rtxControl": True, + "rtxControlConfig": { + "dataRxmsNmax": 5, + "initialRtxTime": 1000 + }, + "flowControl": True, + "flowControlConfig": { + "rateBased": False, + "windowBased": True, + "windowBasedConfig": { + "maxClosedWindowQueueLength": 10, + "initialCredit": 200 + } + } + } + } +} + + # Template for a normal DIF configuration file normal_dif_base = { "difType": "normal-ipc", @@ -80,70 +203,7 @@ normal_dif_base = { }, "qosCubes": [ - { - "name": "unreliablewithflowcontrol", - "id": 1, - "partialDelivery": False, - "orderedDelivery": True, - "efcpPolicies": { - "dtpPolicySet": { - "name": "default", - "version": "0" - }, - "initialATimer": 0, - "dtcpPresent": True, - "dtcpConfiguration": { - "dtcpPolicySet": { - "name": "default", - "version": "0" - }, - "rtxControl": False, - "flowControl": True, - "flowControlConfig": { - "rateBased": False, - "windowBased": True, - "windowBasedConfig": { - "maxClosedWindowQueueLength": 10, - "initialCredit": 200 - } - } - } - } - }, { - "name": "reliablewithflowcontrol", - "id": 2, - "partialDelivery": False, - "orderedDelivery": True, - "maxAllowableGap": 0, - "efcpPolicies": { - "dtpPolicySet": { - "name": "default", - "version": "0" - }, - "initialATimer": 0, - "dtcpPresent": True, - "dtcpConfiguration": { - "dtcpPolicySet": { - "name": "default", - "version": "0" - }, - "rtxControl": True, - "rtxControlConfig": { - "dataRxmsNmax": 5, - "initialRtxTime": 1000 - }, - "flowControl": True, - "flowControlConfig": { - "rateBased": False, - "windowBased": True, - "windowBasedConfig": { - "maxClosedWindowQueueLength": 10, - "initialCredit": 200 - } - } - } - } - } + qos_cube_u_base, qos_cube_r_base ], "knownIPCProcessAddresses": [], diff --git a/rumba/prototypes/ouroboros.py b/rumba/prototypes/ouroboros.py index dfbde9b..b2628cf 100644 --- a/rumba/prototypes/ouroboros.py +++ b/rumba/prototypes/ouroboros.py @@ -209,6 +209,11 @@ class Experiment(mod.Experiment): logger.info("Installed on all nodes...") def _bootstrap_prototype(self): + for dif in self.dif_ordering: + if isinstance(dif, mod.NormalDIF): + if len(dif.qos_cubes) != 0: + logger.warn('QoS cubes not (yet) supported by ' + 'the Ouroboros plugin. Will ignore.') logger.info("Starting IRMd on all nodes...") self.setup_ouroboros() logger.info("Creating IPCPs") diff --git a/rumba/prototypes/rlite.py b/rumba/prototypes/rlite.py index c056b2f..7d0bb57 100644 --- a/rumba/prototypes/rlite.py +++ b/rumba/prototypes/rlite.py @@ -177,6 +177,11 @@ class Experiment(mod.Experiment): logger.info("installation complete") def _bootstrap_prototype(self): + for dif in self.dif_ordering: + if isinstance(dif, mod.NormalDIF): + if len(dif.qos_cubes) != 0: + logger.warn('QoS cubes not (yet) supported by ' + 'the rlite plugin. Will ignore.') logger.info("setting up") self.init_nodes() logger.info("software initialized on all nodes") -- cgit v1.2.3