From 5974215c9864ca72945b553f5374dbc8ba9a191d Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Sat, 16 May 2020 17:22:09 +0200 Subject: Initial commit: Basic Ouroboros API Signed-off-by: Dimitri Staessens --- ouroboros/dev.py | 398 +++++++++++++++++++++++++++++++++++++++++++++++++++++ ouroboros/event.py | 147 ++++++++++++++++++++ ouroboros/qos.py | 110 +++++++++++++++ 3 files changed, 655 insertions(+) create mode 100644 ouroboros/dev.py create mode 100644 ouroboros/event.py create mode 100644 ouroboros/qos.py (limited to 'ouroboros') diff --git a/ouroboros/dev.py b/ouroboros/dev.py new file mode 100644 index 0000000..7d29624 --- /dev/null +++ b/ouroboros/dev.py @@ -0,0 +1,398 @@ +# +# Ouroboros - Copyright (C) 2016 - 2020 +# +# Python API for applications +# +# Dimitri Staessens +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public License +# version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., http://www.fsf.org/about/contact/. +# + +from _ouroboros_cffi import ffi, lib +import errno +from enum import IntFlag +from ouroboros.qos import * +from ouroboros.qos import _qos_to_qosspec, _fl_to_timespec, _qosspec_to_qos, _timespec_to_fl + +# Some constants +MILLION = 1000 * 1000 +BILLION = 1000 * 1000 * 1000 + + +# ouroboros exceptions +class FlowAllocatedException(Exception): + pass + + +class FlowNotAllocatedException(Exception): + pass + + +class FlowDownException(Exception): + pass + + +class FlowPermissionException(Exception): + pass + + +class FlowException(Exception): + pass + + +class FlowDeallocWarning(Warning): + pass + + +def _raise(e: int) -> None: + if e >= 0: + return + + print("error: " + str(e)) + if e == -errno.ETIMEDOUT: + raise TimeoutError() + if e == -errno.EINVAL: + raise ValueError() + if e == -errno.ENOMEM: + raise MemoryError() + else: + raise FlowException() + + +class FlowProperties(IntFlag): + ReadOnly = 0o0 + WriteOnly = 0o1 + ReadWrite = 0o2 + Down = 0o4 + NonBlockingRead = 0o1000 + NonBlockingWrite = 0o2000 + NonBlocking = NonBlockingRead | NonBlockingWrite + NoPartialRead = 0o10000 + NoPartialWrite = 0o200000 + + +class Flow: + + def __init__(self): + self.__fd: int = -1 + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, tb): + lib.flow_dealloc(self.__fd) + + def alloc(self, + dst: str, + qos: QoSSpec = None, + timeo: float = None) -> Optional[QoSSpec]: + """ + Allocates a flow with a certain QoS to a destination + + :param dst: The destination name (string) + :param qos: The QoS for the requested flow (QoSSpec) + :param timeo: The timeout for the flow allocation (None -> forever, 0->async) + :return: The QoS for the new flow + """ + + if self.__fd >= 0: + raise FlowAllocatedException() + + _qos = _qos_to_qosspec(qos) + + _timeo = _fl_to_timespec(timeo) + + self.__fd = lib.flow_alloc(dst.encode(), _qos, _timeo) + + _raise(self.__fd) + + return _qosspec_to_qos(_qos) + + def accept(self, + timeo: float = None) -> QoSSpec: + """ + Accepts new flows and returns the QoS + + :param timeo: The timeout for the flow allocation (None -> forever, 0->async) + :return: The QoS for the new flow + """ + + if self.__fd >= 0: + raise FlowAllocatedException() + + _qos = ffi.new("qosspec_t *") + + _timeo = _fl_to_timespec(timeo) + + self.__fd = lib.flow_accept(_qos, _timeo) + + _raise(self.__fd) + + return _qosspec_to_qos(_qos) + + def join(self, + dst: str, + qos: QoSSpec = None, + timeo: float = None) -> Optional[QoSSpec]: + """ + Join a broadcast layer + + :param dst: The destination broadcast layer name (string) + :param qos: The QoS for the requested flow (QoSSpec) + :param timeo: The timeout for the flow allocation (None -> forever, 0->async) + :return: The QoS for the flow + """ + + if self.__fd >= 0: + raise FlowAllocatedException() + + _qos = _qos_to_qosspec(qos) + + _timeo = _fl_to_timespec(timeo) + + self.__fd = lib.flow_join(dst.encode(), _qos, _timeo) + + _raise(self.__fd) + + return _qosspec_to_qos(_qos) + + def dealloc(self): + """ + Deallocate a flow + + """ + + self.__fd = lib.flow_dealloc(self.__fd) + + if self.__fd < 0: + raise FlowDeallocWarning + + self.__fd = -1 + + def write(self, + buf: bytes, + count: int = None) -> int: + """ + Attempt to write bytes to a flow + + :param buf: Buffer to write from + :param count: Number of bytes to write from the buffer + :return: Number of bytes written + """ + + if self.__fd < 0: + raise FlowNotAllocatedException() + + if count is None: + return lib.flow_write(self.__fd, ffi.from_buffer(buf), len(buf)) + else: + return lib.flow_write(self.__fd, ffi.from_buffer(buf), count) + + def writeline(self, + ln: str) -> int: + """ + Attempt to write a string to a flow + + :param ln: String to write + :return: Number of bytes written + """ + + if self.__fd < 0: + raise FlowNotAllocatedException() + + return self.write(ln.encode(), len(ln)) + + def read(self, + count: int = None) -> bytes: + """ + Attempt to read bytes from a flow + + :param count: Maximum number of bytes to read + :return: Bytes read + """ + + if self.__fd < 0: + raise FlowNotAllocatedException() + + if count is None: + count = 2048 + + _buf = ffi.new("char []", count) + + result = lib.flow_read(self.__fd, _buf, count) + + return ffi.unpack(_buf, result) + + def readline(self): + """ + + :return: A string + """ + if self.__fd < 0: + raise FlowNotAllocatedException() + + return self.read().decode() + + # flow manipulation + def set_snd_timeout(self, + timeo: float): + """ + Set the timeout for blocking writes + """ + _timeo = _fl_to_timespec(timeo) + + if lib.flow_set_snd_timout(self.__fd, _timeo) != 0: + raise FlowPermissionException() + + def get_snd_timeout(self) -> float: + """ + Get the timeout for blocking writes + + :return: timeout for blocking writes + """ + _timeo = ffi.new("struct timespec *") + + if lib.flow_get_snd_timeout(self.__fd, _timeo) != 0: + raise FlowPermissionException() + + return _timespec_to_fl(_timeo) + + def set_rcv_timeout(self, + timeo: float): + """ + Set the timeout for blocking writes + """ + _timeo = _fl_to_timespec(timeo) + + if lib.flow_set_rcv_timout(self.__fd, _timeo) != 0: + raise FlowPermissionException() + + def get_rcv_timeout(self) -> float: + """ + Get the timeout for blocking reads + + :return: timeout for blocking writes + """ + _timeo = ffi.new("struct timespec *") + + if lib.flow_get_rcv_timeout(self.__fd, _timeo) != 0: + raise FlowPermissionException() + + return _timespec_to_fl(_timeo) + + def get_qos(self) -> QoSSpec: + """ + + :return: Current QoS on the flow + """ + _qos = ffi.new("qosspec_t *") + + if lib.flow_get_qos(self.__fd, _qos) != 0: + raise FlowPermissionException() + + return _qosspec_to_qos(_qos) + + def get_rx_queue_len(self) -> int: + """ + + :return: + """ + + size = ffi.new("size_t *") + + if lib.flow_get_rx_qlen(self.__fd, size) != 0: + raise FlowPermissionException() + + return int(size) + + def get_tx_queue_len(self) -> int: + """ + + :return: + """ + + size = ffi.new("size_t *") + + if lib.flow_get_tx_qlen(self.__fd, size) != 0: + raise FlowPermissionException() + + return int(size) + + def set_flags(self, flags: FlowProperties): + """ + Set flags for this flow. + :param flags: + """ + + _flags = ffi.new("uint32_t *", int(flags)) + + if lib.flow_set_flag(self.__fd, _flags): + raise FlowPermissionException() + + def get_flags(self) -> FlowProperties: + """ + Get the flags for this flow + """ + + flags = lib.flow_get_flag(self.__fd) + if flags < 0: + raise FlowPermissionException() + + return FlowProperties(int(flags)) + + +def flow_alloc(dst: str, + qos: QoSSpec = None, + timeo: float = None) -> Flow: + """ + + :param dst: Destination name + :param qos: Requested QoS + :param timeo: Timeout to wait for the allocation + :return: A new Flow() + """ + + f = Flow() + f.alloc(dst, qos, timeo) + return f + + +def flow_accept(timeo: float = None) -> Flow: + """ + + :param timeo: Timeout to wait for the allocation + :return: A new Flow() + """ + + f = Flow() + f.accept(timeo) + return f + + +def flow_join(dst: str, + qos: QoSSpec = None, + timeo: float = None) -> Flow: + """ + + :param dst: Broadcast layer name + :param qos: Requested QoS + :param timeo: Timeout to wait for the allocation + :return: A new Flow() + """ + + f = Flow() + f.join(dst, qos, timeo) + return f + + diff --git a/ouroboros/event.py b/ouroboros/event.py new file mode 100644 index 0000000..b707c1b --- /dev/null +++ b/ouroboros/event.py @@ -0,0 +1,147 @@ +# +# Ouroboros - Copyright (C) 2016 - 2020 +# +# Python API for applications +# +# Dimitri Staessens +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public License +# version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., http://www.fsf.org/about/contact/. +# + +from ouroboros.dev import * +from ouroboros.qos import _fl_to_timespec + + +# async API +class FlowEventError(Exception): + pass + + +class FEventType(IntFlag): + FlowPkt = lib.FLOW_PKT + FlowDown = lib.FLOW_DOWN + FlowUp = lib.FLOW_UP + FlowAlloc = lib.FLOW_ALLOC + FlowDealloc = lib.FLOW_DEALLOC + + +class FEventQueue: + """ + A queue of events waiting to be handled + """ + + def __init__(self): + self.__fq = lib.fqueue_create() + if self.__fq is ffi.NULL: + raise MemoryError("Failed to create FEventQueue") + + def __del__(self): + lib.fqueue_destroy(self.__fq) + + def next(self): + """ + Get the next event + :return: Flow and eventtype on that flow + """ + f = Flow() + f._Flow__fd = lib.fqueue_next(self.__fq) + if f._Flow__fd < 0: + raise FlowEventError + + _type = lib.fqueue_type(self.__fq) + if _type < 0: + raise FlowEventError + + return f, _type + + +class FlowSet: + """ + A set of flows that can be monitored for events + """ + def __init__(self, + flows: [Flow] = None): + + self.__set = lib.fset_create() + if self.__set is ffi.NULL: + raise MemoryError("Failed to create FlowSet") + + if flows is not None: + for flow in flows: + if lib.fset_add(self.__set, flow._Flow__fd) != 0: + lib.fset_destroy(self.__set) + self.__set = ffi.NULL + raise MemoryError("Failed to add flow " + str(flow._Flow__fd) + ".") + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, tb): + lib.fset_destroy(self.__set) + + def add(self, + flow: Flow): + """ + Add a Flow + + :param flow: The flow object to add + """ + + if self.__set is ffi.NULL: + raise ValueError + + if lib.fset_add(self.__set, flow._Flow___fd) != 0: + raise MemoryError("Failed to add flow") + + def zero(self): + """ + Remove all Flows from this set + """ + + if self.__set is ffi.NULL: + raise ValueError + + lib.fset_zero(self.__set) + + def remove(self, + flow: Flow): + """ + Remove a flow from a set + + :param flow: + """ + + if self.__set is ffi.NULL: + raise ValueError + + lib.fset_del(self.__set, flow._Flow__fd) + + def wait(self, + fq: FEventType, + timeo: float = None): + """ + Wait for at least one event on one of the monitored flows + """ + + if self.__set is ffi.NULL: + raise ValueError + + _timeo = _fl_to_timespec(timeo) + + ret = lib.fevent(self.__set, fq._FEventQueue__fq, _timeo) + if ret < 0: + raise FlowEventError + + def destroy(self): + lib.fset_destroy(self.__set) diff --git a/ouroboros/qos.py b/ouroboros/qos.py new file mode 100644 index 0000000..f437ee2 --- /dev/null +++ b/ouroboros/qos.py @@ -0,0 +1,110 @@ +# +# Ouroboros - Copyright (C) 2016 - 2020 +# +# Python API for applications - QoS +# +# Dimitri Staessens +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public License +# version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., http://www.fsf.org/about/contact/. +# + +from _ouroboros_cffi import ffi +from math import modf +from typing import Optional + +# Some constants +MILLION = 1000 * 1000 +BILLION = 1000 * 1000 * 1000 + + +class QoSSpec: + """ + delay: In ms, default 1000s + bandwidth: In bits / s, default 0 + availability: Class of 9s, default 0 + loss: Packet loss in ppm, default MILLION + ber: Bit error rate, errors per billion bits. default BILLION + in_order: In-order delivery, enables FRCT, default 0 + max_gap: Maximum interruption in ms, default MILLION + cypher_s: Requested encryption strength in bits + """ + + def __init__(self, + delay: int = MILLION, + bandwidth: int = 0, + availability: int = 0, + loss: int = 1, + ber: int = MILLION, + in_order: int = 0, + max_gap: int = MILLION, + cypher_s: int = 0): + self.delay = delay + self.bandwidth = bandwidth + self.availability = availability + self.loss = loss + self.ber = ber + self.in_order = in_order + self.max_gap = max_gap + self.cypher_s = cypher_s + + +def _fl_to_timespec(timeo: float): + if timeo is None: + return ffi.NULL + elif timeo <= 0: + return ffi.new("struct timespec *", [0, 0]) + else: + frac, whole = modf(timeo) + _timeo = ffi.new("struct timespec *") + _timeo.tv_sec = whole + _timeo.tv_nsec = frac * BILLION + return _timeo + + +def _timespec_to_fl(_timeo) -> Optional[float]: + if _timeo is ffi.NULL: + return None + elif _timeo.tv_sec <= 0 and _timeo.tv_nsec == 0: + return 0 + else: + return _timeo.tv_sec + _timeo.tv_nsec / BILLION + + +def _qos_to_qosspec(qos: QoSSpec): + if qos is None: + return ffi.NULL + else: + return ffi.new("qosspec_t *", + [qos.delay, + qos.bandwidth, + qos.availability, + qos.loss, + qos.ber, + qos.in_order, + qos.max_gap, + qos.cypher_s]) + + +def _qosspec_to_qos(_qos) -> Optional[QoSSpec]: + if _qos is ffi.NULL: + return None + else: + return QoSSpec(delay=_qos.delay, + bandwidth=_qos.bandwidth, + availability=_qos.availability, + loss=_qos.loss, + ber=_qos.ber, + in_order=_qos.in_order, + max_gap=_qos.max_gap, + cypher_s=_qos.cypher_s) -- cgit v1.2.3