After this call, the Flow object is not readable +or writeable anymore. + +```Python +f.alloc("name") +``` + + will allocate a new flow for an existing Flow object. + +To read / write from a flow: + +```Python # read up to _count_ bytes and return bytes +f.readline(count) # read up to _count_ characters as a string +f.write(buf, count) # write up to _count_ bytes from buffer +f.writeline(str, count) # write up to _count_ characters from string +``` + +## Quality of Service (QoS) + +The QoS spec details have not been finalized in Ouroboros. It is just +here to give a general idea and to control some basics of the flow. +You can specify a QoSSpec for flow allocation. + +For instance, + +```Python +qos = QoSSpec(loss=0, cypher_s=256) +f = flow_alloc("name", qos) +``` + +will create a new flow with FRCP retransmission enabled and encrypted +using a 256-bit ECDHE-AES-SHA3 cypher. + +## Manipulating flows + +A number of methods are available for how to interact with Flow + +```Python +f.set_snd_timeout(0.5) # set timeout for blocking write +f.set_rcv_timeout(1.0) # set timeout for blocking read +f.get_snd_timeout() # get timeout for blocking write +f.get_rcv_timeout() # get timeout for blocking read +f.get_qos() # get the QoSSpec for this flow +f.get_rx_queue_len() # get the number of packets in the rx buffer +f.get_tx_queue_len() # get the number of packets in the tx buffer +f.set_flags(flags) # set a number of flags for this flow +f.get_flags() # get the flags for this flow +``` + +The flags are specified as an enum FlowProperties: + +```Python +class FlowProperties(IntFlag): + ReadOnly + WriteOnly + ReadWrite + Down + NonBlockingRead + NonBlockingWrite + NonBlocking + NoPartialRead + NoPartialWrite +``` + +See the Ouroboros fccntl documentation for more details. + +```shell +man fccntl +``` + +## Event API + +Multiple flows can be monitored for activity in parallel using a +FlowSet and FEventQueue objects. + +FlowSets allow grouping a bunch of Flow objects together to listen for +activity. It can be constructed with an optional list of Flows, or +flows can be added or removed using the following methods: + +```Python +set = FlowSet() # create a flow set, +set.add(f) # add a Flow 'f' to this set +set.remove(f) # remove a Flow 'f' from this set # remove all Flows in this set +``` + +An FEventQueue stores pending events on flows. + +The event types are defined as follows: +```Python +class FEventType(IntFlag): + FlowPkt + FlowDown + FlowUp + FlowAlloc + FlowDealloc +``` + +and can be obtained by calling the next method: + +```Python + f, t = # Return active flow 'f' and type of event 't' +``` + +An FEventQueue is populated from a FlowSet. + +```Python +fq = FEventQueue() # Create an eventqueue +set = FlowSet([f1, f2, f3]) # Create a new set with a couple of Flow objects +set.wait(fq, timeo=1.0) # Wait for 1 second or until event +while f, t = + if t == FEventType.FlowPkt: + msg = f.readline() + ... +set.destroy() +``` + +A flow_set must be destroyed when it goes out of scope. +To avoid having to call destroy, Python's with statement can be used: + +```Python +fq = FEventQueue() +with FlowSet([f]) as fs: + fs.wait(fq) +f2, t = +if t == FEventType.FlowPkt: + line = f2.readline() +``` + +## Examples + +Some example code is in the examples folder. + +## License +pyOuorboros is LGPLv2.1. The examples are 3-clause BSD. diff --git a/examples/ b/examples/ new file mode 100755 index 0000000..395930c --- /dev/null +++ b/examples/ @@ -0,0 +1,62 @@ +#!/bin/python + +# Ouroboros - Copyright (C) 2016 - 2020 +# +# A simple echo application +# +# Dimitri Staessens +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, +# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED +# OF THE POSSIBILITY OF SUCH DAMAGE. + +from import * +import argparse + + +def client(): + with flow_alloc("oecho") as f: + f.writeline("Hello, PyOuroboros!") + print(f.readline()) + + +def server(): + print("Starting the server.") + while True: + with flow_accept() as f: + print("New flow.") + line = f.readline() + print("Message from client is " + line) + f.writeline(line) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='A simple echo client/server') + parser.add_argument('-l', '--listen', help='run as a server', action='store_true') + args = parser.parse_args() + server() if args.listen is True else client() diff --git a/examples/ b/examples/ new file mode 100755 index 0000000..4d9d1f7 --- /dev/null +++ b/examples/ @@ -0,0 +1,71 @@ +#!/bin/python + +# Ouroboros - Copyright (C) 2016 - 2020 +# +# A simple echo application +# +# Dimitri Staessens +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, +# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED +# OF THE POSSIBILITY OF SUCH DAMAGE. + +from ouroboros.event import * +import argparse + + +def client(): + with flow_alloc("oecho") as f: + f.writeline("Hello, PyOuroboros!") + print(f.readline()) + + +def server(): + print("Starting the server.") + while True: + with flow_accept() as f: + print("New flow.") + fq = FEventQueue() + with FlowSet([f]) as fs: + fs.wait(fq) + f2, t = + if t != FEventType.FlowPkt: + continue + line = f2.readline() + print("Message from client is " + line) + f2.writeline(line) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='A simple echo client/server') + parser.add_argument('-l', '--listen', help='run as a server', action='store_true') + args = parser.parse_args() + if args.listen is True: + server() + else: + client() diff --git a/ffi/fccntl_wrap.h b/ffi/fccntl_wrap.h new file mode 100644 index 0000000..ab227ea --- /dev/null +++ b/ffi/fccntl_wrap.h @@ -0,0 +1,73 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2020 + * + * An fccntl wrapper + * + * Dimitri Staessens + * Sander Vrijders + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., + */ + +#include + +int flow_set_snd_timeout(int fd, struct timespec * ts) +{ + return fccntl(fd, FLOWSSNDTIMEO, ts); +} + +int flow_set_rcv_timeout(int fd, struct timespec * ts) +{ + return fccntl(fd, FLOWSRCVTIMEO, ts); +} + +int flow_get_snd_timeout(int fd, struct timespec * ts) +{ + return fccntl(fd, FLOWGSNDTIMEO, ts); +} + +int flow_get_rcv_timeout(int fd, struct timespec * ts) +{ + return fccntl(fd, FLOWGRCVTIMEO, ts); +} + +int flow_get_qos(int fd, qosspec_t * qs) +{ + return fccntl(fd, FLOWGQOSSPEC, qs); +} + +int flow_get_rx_qlen(int fd, size_t * sz) +{ + return fccntl(fd, FLOWGRXQLEN, sz); +} + +int flow_get_tx_qlen(int fd, size_t * sz) +{ + return fccntl(fd, FLOWGTXQLEN, sz); +} + +int flow_set_flags(int fd, uint32_t flags) +{ + return fccntl(fd, FLOWSFLAGS, flags); +} + +int flow_get_flags(int fd) +{ + uint32_t flags; + + if (fccntl(fd, FLOWGFLAGS, &flags)) + return -EPERM; + + return (int) flags; +} diff --git a/ffi/ b/ffi/ new file mode 100644 index 0000000..b4ace8e --- /dev/null +++ b/ffi/ @@ -0,0 +1,138 @@ +# +# Ouroboros - Copyright (C) 2016 - 2020 +# +# Python API for applications +# +# Dimitri Staessens +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public License +# version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., +# + +from cffi import FFI + +ffibuilder: FFI = FFI() + +ffibuilder.cdef(""" +/* OUROBOROS QOS.H */ +typedef struct qos_spec { + uint32_t delay; /* In ms */ + uint64_t bandwidth; /* In bits/s */ + uint8_t availability; /* Class of 9s */ + uint32_t loss; /* Packet loss */ + uint32_t ber; /* Bit error rate, errors per billion bits */ + uint8_t in_order; /* In-order delivery, enables FRCT */ + uint32_t max_gap; /* In ms */ + uint16_t cypher_s; /* Cypher strength, 0 = no encryption */ +} qosspec_t; + +/* OUROBOROS DEV.H */ +/* Returns flow descriptor, qs updates to supplied QoS. */ +int flow_alloc(const char * dst_name, + qosspec_t * qs, + const struct timespec * timeo); + +/* Returns flow descriptor, qs updates to supplied QoS. */ +int flow_accept(qosspec_t * qs, + const struct timespec * timeo); + +/* Returns flow descriptor, qs updates to supplied QoS. */ +int flow_join(const char * bc, + qosspec_t * qs, + const struct timespec * timeo); + +int flow_dealloc(int fd); + +ssize_t flow_write(int fd, + const void * buf, + size_t count); + +ssize_t flow_read(int fd, + void * buf, + size_t count); + +/*OUROBOROS FCCNTL.H, VIA WRAPPER */ +int flow_set_snd_timeout(int fd, struct timespec * ts); + +int flow_set_rcv_timeout(int fd, struct timespec * ts); + +int flow_get_snd_timeout(int fd, struct timespec * ts); + +int flow_get_rcv_timeout(int fd, struct timespec * ts); + +int flow_get_qos(int fd, qosspec_t * qs); + +int flow_get_rx_qlen(int fd, size_t * sz); + +int flow_get_tx_qlen(int fd, size_t * sz); + +int flow_set_flags(int fd, uint32_t flags); + +int flow_get_flags(int fd); + +/*OUROBOROS FQUEUE.H */ +enum fqtype { + FLOW_PKT = (1 << 0), + FLOW_DOWN = (1 << 1), + FLOW_UP = (1 << 2), + FLOW_ALLOC = (1 << 3), + FLOW_DEALLOC = (1 << 4) +}; + +struct flow_set; + +struct fqueue; + +typedef struct flow_set fset_t; +typedef struct fqueue fqueue_t; + +fset_t * fset_create(void); + +void fset_destroy(fset_t * set); + +fqueue_t * fqueue_create(void); + +void fqueue_destroy(struct fqueue * fq); + +void fset_zero(fset_t * set); + +int fset_add(fset_t * set, + int fd); + +bool fset_has(const fset_t * set, + int fd); + +void fset_del(fset_t * set, + int fd); + +int fqueue_next(fqueue_t * fq); + +int fqueue_type(fqueue_t * fq); + +ssize_t fevent(fset_t * set, + fqueue_t * fq, + const struct timespec * timeo); +""") + +ffibuilder.set_source("_ouroboros_cffi", + """ +#include "ouroboros/qos.h" +#include "ouroboros/dev.h" +#include "fccntl_wrap.h" +#include "ouroboros/fqueue.h" + """, + libraries=['ouroboros-dev'], + extra_compile_args=["-I./ffi/"]) + +if __name__ == "__main__": + ffibuilder.compile(verbose=True) diff --git a/ouroboros/ b/ouroboros/ new file mode 100644 index 0000000..7d29624 --- /dev/null +++ b/ouroboros/ @@ -0,0 +1,398 @@ +# +# Ouroboros - Copyright (C) 2016 - 2020 +# +# Python API for applications +# +# Dimitri Staessens +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public License +# version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., +# + +from _ouroboros_cffi import ffi, lib +import errno +from enum import IntFlag +from ouroboros.qos import * +from ouroboros.qos import _qos_to_qosspec, _fl_to_timespec, _qosspec_to_qos, _timespec_to_fl + +# Some constants +MILLION = 1000 * 1000 +BILLION = 1000 * 1000 * 1000 + + +# ouroboros exceptions +class FlowAllocatedException(Exception): + pass + + +class FlowNotAllocatedException(Exception): + pass + + +class FlowDownException(Exception): + pass + + +class FlowPermissionException(Exception): + pass + + +class FlowException(Exception): + pass + + +class FlowDeallocWarning(Warning): + pass + + +def _raise(e: int) -> None: + if e >= 0: + return + + print("error: " + str(e)) + if e == -errno.ETIMEDOUT: + raise TimeoutError() + if e == -errno.EINVAL: + raise ValueError() + if e == -errno.ENOMEM: + raise MemoryError() + else: + raise FlowException() + + +class FlowProperties(IntFlag): + ReadOnly = 0o0 + WriteOnly = 0o1 + ReadWrite = 0o2 + Down = 0o4 + NonBlockingRead = 0o1000 + NonBlockingWrite = 0o2000 + NonBlocking = NonBlockingRead | NonBlockingWrite + NoPartialRead = 0o10000 + NoPartialWrite = 0o200000 + + +class Flow: + + def __init__(self): + self.__fd: int = -1 + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, tb): + lib.flow_dealloc(self.__fd) + + def alloc(self, + dst: str, + qos: QoSSpec = None, + timeo: float = None) -> Optional[QoSSpec]: + """ + Allocates a flow with a certain QoS to a destination + + :param dst: The destination name (string) + :param qos: The QoS for the requested flow (QoSSpec) + :param timeo: The timeout for the flow allocation (None -> forever, 0->async) + :return: The QoS for the new flow + """ + + if self.__fd >= 0: + raise FlowAllocatedException() + + _qos = _qos_to_qosspec(qos) + + _timeo = _fl_to_timespec(timeo) + + self.__fd = lib.flow_alloc(dst.encode(), _qos, _timeo) + + _raise(self.__fd) + + return _qosspec_to_qos(_qos) + + def accept(self, + timeo: float = None) -> QoSSpec: + """ + Accepts new flows and returns the QoS + + :param timeo: The timeout for the flow allocation (None -> forever, 0->async) + :return: The QoS for the new flow + """ + + if self.__fd >= 0: + raise FlowAllocatedException() + + _qos ="qosspec_t *") + + _timeo = _fl_to_timespec(timeo) + + self.__fd = lib.flow_accept(_qos, _timeo) + + _raise(self.__fd) + + return _qosspec_to_qos(_qos) + + def join(self, + dst: str, + qos: QoSSpec = None, + timeo: float = None) -> Optional[QoSSpec]: + """ + Join a broadcast layer + + :param dst: The destination broadcast layer name (string) + :param qos: The QoS for the requested flow (QoSSpec) + :param timeo: The timeout for the flow allocation (None -> forever, 0->async) + :return: The QoS for the flow + """ + + if self.__fd >= 0: + raise FlowAllocatedException() + + _qos = _qos_to_qosspec(qos) + + _timeo = _fl_to_timespec(timeo) + + self.__fd = lib.flow_join(dst.encode(), _qos, _timeo) + + _raise(self.__fd) + + return _qosspec_to_qos(_qos) + + def dealloc(self): + """ + Deallocate a flow + + """ + + self.__fd = lib.flow_dealloc(self.__fd) + + if self.__fd < 0: + raise FlowDeallocWarning + + self.__fd = -1 + + def write(self, + buf: bytes, + count: int = None) -> int: + """ + Attempt to write bytes to a flow + + :param buf: Buffer to write from + :param count: Number of bytes to write from the buffer + :return: Number of bytes written + """ + + if self.__fd < 0: + raise FlowNotAllocatedException() + + if count is None: + return lib.flow_write(self.__fd, ffi.from_buffer(buf), len(buf)) + else: + return lib.flow_write(self.__fd, ffi.from_buffer(buf), count) + + def writeline(self, + ln: str) -> int: + """ + Attempt to write a string to a flow + + :param ln: String to write + :return: Number of bytes written + """ + + if self.__fd < 0: + raise FlowNotAllocatedException() + + return self.write(ln.encode(), len(ln)) + + def read(self, + count: int = None) -> bytes: + """ + Attempt to read bytes from a flow + + :param count: Maximum number of bytes to read + :return: Bytes read + """ + + if self.__fd < 0: + raise FlowNotAllocatedException() + + if count is None: + count = 2048 + + _buf ="char []", count) + + result = lib.flow_read(self.__fd, _buf, count) + + return ffi.unpack(_buf, result) + + def readline(self): + """ + + :return: A string + """ + if self.__fd < 0: + raise FlowNotAllocatedException() + + return + + # flow manipulation + def set_snd_timeout(self, + timeo: float): + """ + Set the timeout for blocking writes + """ + _timeo = _fl_to_timespec(timeo) + + if lib.flow_set_snd_timout(self.__fd, _timeo) != 0: + raise FlowPermissionException() + + def get_snd_timeout(self) -> float: + """ + Get the timeout for blocking writes + + :return: timeout for blocking writes + """ + _timeo ="struct timespec *") + + if lib.flow_get_snd_timeout(self.__fd, _timeo) != 0: + raise FlowPermissionException() + + return _timespec_to_fl(_timeo) + + def set_rcv_timeout(self, + timeo: float): + """ + Set the timeout for blocking writes + """ + _timeo = _fl_to_timespec(timeo) + + if lib.flow_set_rcv_timout(self.__fd, _timeo) != 0: + raise FlowPermissionException() + + def get_rcv_timeout(self) -> float: + """ + Get the timeout for blocking reads + + :return: timeout for blocking writes + """ + _timeo ="struct timespec *") + + if lib.flow_get_rcv_timeout(self.__fd, _timeo) != 0: + raise FlowPermissionException() + + return _timespec_to_fl(_timeo) + + def get_qos(self) -> QoSSpec: + """ + + :return: Current QoS on the flow + """ + _qos ="qosspec_t *") + + if lib.flow_get_qos(self.__fd, _qos) != 0: + raise FlowPermissionException() + + return _qosspec_to_qos(_qos) + + def get_rx_queue_len(self) -> int: + """ + + :return: + """ + + size ="size_t *") + + if lib.flow_get_rx_qlen(self.__fd, size) != 0: + raise FlowPermissionException() + + return int(size) + + def get_tx_queue_len(self) -> int: + """ + + :return: + """ + + size ="size_t *") + + if lib.flow_get_tx_qlen(self.__fd, size) != 0: + raise FlowPermissionException() + + return int(size) + + def set_flags(self, flags: FlowProperties): + """ + Set flags for this flow. + :param flags: + """ + + _flags ="uint32_t *", int(flags)) + + if lib.flow_set_flag(self.__fd, _flags): + raise FlowPermissionException() + + def get_flags(self) -> FlowProperties: + """ + Get the flags for this flow + """ + + flags = lib.flow_get_flag(self.__fd) + if flags < 0: + raise FlowPermissionException() + + return FlowProperties(int(flags)) + + +def flow_alloc(dst: str, + qos: QoSSpec = None, + timeo: float = None) -> Flow: + """ + + :param dst: Destination name + :param qos: Requested QoS + :param timeo: Timeout to wait for the allocation + :return: A new Flow() + """ + + f = Flow() + f.alloc(dst, qos, timeo) + return f + + +def flow_accept(timeo: float = None) -> Flow: + """ + + :param timeo: Timeout to wait for the allocation + :return: A new Flow() + """ + + f = Flow() + f.accept(timeo) + return f + + +def flow_join(dst: str, + qos: QoSSpec = None, + timeo: float = None) -> Flow: + """ + + :param dst: Broadcast layer name + :param qos: Requested QoS + :param timeo: Timeout to wait for the allocation + :return: A new Flow() + """ + + f = Flow() + f.join(dst, qos, timeo) + return f + + diff --git a/ouroboros/ b/ouroboros/ new file mode 100644 index 0000000..b707c1b --- /dev/null +++ b/ouroboros/ @@ -0,0 +1,147 @@ +# +# Ouroboros - Copyright (C) 2016 - 2020 +# +# Python API for applications +# +# Dimitri Staessens +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public License +# version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., +# + +from import * +from ouroboros.qos import _fl_to_timespec + + +# async API +class FlowEventError(Exception): + pass + + +class FEventType(IntFlag): + FlowPkt = lib.FLOW_PKT + FlowDown = lib.FLOW_DOWN + FlowUp = lib.FLOW_UP + FlowAlloc = lib.FLOW_ALLOC + FlowDealloc = lib.FLOW_DEALLOC + + +class FEventQueue: + """ + A queue of events waiting to be handled + """ + + def __init__(self): + self.__fq = lib.fqueue_create() + if self.__fq is ffi.NULL: + raise MemoryError("Failed to create FEventQueue") + + def __del__(self): + lib.fqueue_destroy(self.__fq) + + def next(self): + """ + Get the next event + :return: Flow and eventtype on that flow + """ + f = Flow() + f._Flow__fd = lib.fqueue_next(self.__fq) + if f._Flow__fd < 0: + raise FlowEventError + + _type = lib.fqueue_type(self.__fq) + if _type < 0: + raise FlowEventError + + return f, _type + + +class FlowSet: + """ + A set of flows that can be monitored for events + """ + def __init__(self, + flows: [Flow] = None): + + self.__set = lib.fset_create() + if self.__set is ffi.NULL: + raise MemoryError("Failed to create FlowSet") + + if flows is not None: + for flow in flows: + if lib.fset_add(self.__set, flow._Flow__fd) != 0: + lib.fset_destroy(self.__set) + self.__set = ffi.NULL + raise MemoryError("Failed to add flow " + str(flow._Flow__fd) + ".") + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, tb): + lib.fset_destroy(self.__set) + + def add(self, + flow: Flow): + """ + Add a Flow + + :param flow: The flow object to add + """ + + if self.__set is ffi.NULL: + raise ValueError + + if lib.fset_add(self.__set, flow._Flow___fd) != 0: + raise MemoryError("Failed to add flow") + + def zero(self): + """ + Remove all Flows from this set + """ + + if self.__set is ffi.NULL: + raise ValueError + + lib.fset_zero(self.__set) + + def remove(self, + flow: Flow): + """ + Remove a flow from a set + + :param flow: + """ + + if self.__set is ffi.NULL: + raise ValueError + + lib.fset_del(self.__set, flow._Flow__fd) + + def wait(self, + fq: FEventType, + timeo: float = None): + """ + Wait for at least one event on one of the monitored flows + """ + + if self.__set is ffi.NULL: + raise ValueError + + _timeo = _fl_to_timespec(timeo) + + ret = lib.fevent(self.__set, fq._FEventQueue__fq, _timeo) + if ret < 0: + raise FlowEventError + + def destroy(self): + lib.fset_destroy(self.__set) diff --git a/ouroboros/ b/ouroboros/ new file mode 100644 index 0000000..f437ee2 --- /dev/null +++ b/ouroboros/ @@ -0,0 +1,110 @@ +# +# Ouroboros - Copyright (C) 2016 - 2020 +# +# Python API for applications - QoS +# +# Dimitri Staessens +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public License +# version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., +# + +from _ouroboros_cffi import ffi +from math import modf +from typing import Optional + +# Some constants +MILLION = 1000 * 1000 +BILLION = 1000 * 1000 * 1000 + + +class QoSSpec: + """ + delay: In ms, default 1000s + bandwidth: In bits / s, default 0 + availability: Class of 9s, default 0 + loss: Packet loss in ppm, default MILLION + ber: Bit error rate, errors per billion bits. default BILLION + in_order: In-order delivery, enables FRCT, default 0 + max_gap: Maximum interruption in ms, default MILLION + cypher_s: Requested encryption strength in bits + """ + + def __init__(self, + delay: int = MILLION, + bandwidth: int = 0, + availability: int = 0, + loss: int = 1, + ber: int = MILLION, + in_order: int = 0, + max_gap: int = MILLION, + cypher_s: int = 0): + self.delay = delay + self.bandwidth = bandwidth + self.availability = availability + self.loss = loss + self.ber = ber + self.in_order = in_order + self.max_gap = max_gap + self.cypher_s = cypher_s + + +def _fl_to_timespec(timeo: float): + if timeo is None: + return ffi.NULL + elif timeo <= 0: + return"struct timespec *", [0, 0]) + else: + frac, whole = modf(timeo) + _timeo ="struct timespec *") + _timeo.tv_sec = whole + _timeo.tv_nsec = frac * BILLION + return _timeo + + +def _timespec_to_fl(_timeo) -> Optional[float]: + if _timeo is ffi.NULL: + return None + elif _timeo.tv_sec <= 0 and _timeo.tv_nsec == 0: + return 0 + else: + return _timeo.tv_sec + _timeo.tv_nsec / BILLION + + +def _qos_to_qosspec(qos: QoSSpec): + if qos is None: + return ffi.NULL + else: + return"qosspec_t *", + [qos.delay, + qos.bandwidth, + qos.availability, + qos.loss, + qos.ber, + qos.in_order, + qos.max_gap, + qos.cypher_s]) + + +def _qosspec_to_qos(_qos) -> Optional[QoSSpec]: + if _qos is ffi.NULL: + return None + else: + return QoSSpec(delay=_qos.delay, + bandwidth=_qos.bandwidth, + availability=_qos.availability, + loss=_qos.loss, + ber=_qos.ber, + in_order=_qos.in_order, + max_gap=_qos.max_gap, + cypher_s=_qos.cypher_s) diff --git a/ b/ new file mode 100755 index 0000000..023f3b4 --- /dev/null +++ b/ @@ -0,0 +1,25 @@ +#!/usr/bin/env python + +import setuptools + +setuptools.setup( + name='PyOuroboros', + version=0.17, + url='', + keywords='ouroboros IPC subsystem', + author='Dimitri Staessens', + author_email='', + license='LGPLv2.1', + description='Python API for Ouroboros', + packages=[ + 'ouroboros' + ], + setup_requires=[ + "cffi>=1.0.0" + ], + cffi_modules=[ + "ffi/" + ], + install_requires=[ + "cffi>=1.0.0" + ]) -- cgit v1.2.3