# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# pylint: disable=redefined-builtin
import numbers
from six import add_metaclass
from six.moves import xrange
from spinn_utilities.abstract_base import AbstractBase, abstractmethod
from spinn_utilities.overrides import overrides
from .abstract_sized import AbstractSized
from .multiple_values_exception import MultipleValuesException
import numpy
[docs]@add_metaclass(AbstractBase)
class AbstractList(AbstractSized):
""" A ranged implementation of list.
Functions that change the size of the list are *not* supported.\
These include::
__setitem__ where key >= len
__delitem__
append
extend
insert
pop
remove
Function that manipulate the list based on values are not supported.\
These include::
reverse, __reversed__
sort
In the current version the IDs are zero base consecutive numbers so there\
is no difference between value-based IDs and index-based IDs\
but this could change in the future.
Supports the following arithmetic operations over the list:
`+`
element-wise addition or addition of a single scalar
`-`
element-wise subtraction or subtraction of a single scalar
`*`
element-wise multiplication or multiplication by a single scalar
`/`
element-wise true division or true division by a single scalar
`//`
element-wise floor division or floor division by a single scalar
"""
__slots__ = [
"_key"]
def __init__(self, size, key=None):
"""
:param size: Fixed length of the list
:param key: The dict key this list covers.\
This is used only for better Exception messages
"""
super(AbstractList, self).__init__(size)
self._key = key
[docs] @abstractmethod
def range_based(self):
""" Shows if the list is suited to deal with ranges or not.
All list must implement all the range functions, \
but there are times when using ranges will probably be slower than \
using individual values.\
For example the individual values may be stored in a list in which \
case the ranges are created on demand.
:return: True if and only if Ranged based calls are recommended.
"""
def __len__(self):
""" Size of the list, irrespective of actual values
:return: the initial and Fixed size of the list
"""
return self._size
def __eq__(self, other):
if isinstance(other, AbstractList):
if self.range_based and other.range_based:
return numpy.array_equal(list(self.iter_ranges()),
list(other.iter_ranges()))
return numpy.array_equal(list(self), list(other))
def __ne__(self, other):
if not isinstance(other, AbstractList):
return True
return not self.__eq__(other)
def __str__(self):
return str(list(self))
__repr__ = __str__
[docs] def get_single_value_all(self):
""" If possible, returns a single value shared by the whole list.
For multiple values use `for x in list`, `iter(list)` or `list.iter`,\
or one of the `iter_ranges` methods
:return: Value shared by all elements in the list
:raises spinn_utilities.ranged.MultipleValuesException: \
If even one elements has a different value
"""
# This is not elegant code but as the ranges could be created on the
# fly the best way.
have_item = False
only_range = None
for this_range in self.iter_ranges():
if have_item:
# If we can get another range, there must be more than one
# value, so raise the exception
raise MultipleValuesException(
self._key, only_range[2], this_range[2])
have_item = True
only_range = this_range
if not have_item:
# Pretend we totally failed
raise StopIteration()
# There isn't another range, so return the value from the only range
return only_range[2]
[docs] @abstractmethod
def get_value_by_id(self, id): # @ReservedAssignment
""" Returns the value for one item in the list
:param id: One of the IDs of an element in the list
:type id: int
:return: The value of that element
"""
[docs] @abstractmethod
def get_single_value_by_slice(self, slice_start, slice_stop):
""" If possible, returns a single value shared by the whole slice list.
For multiple values, use `for x in list`, `iter(list)`, `list.iter`,\
or one of the `iter_ranges` methods
:return: Value shared by all elements in the slice
:raises spinn_utilities.ranged.MultipleValuesException: \
If even one elements has a different value. \
Not thrown if elements outside of the slice have a different value
"""
def __getslice__(self, start, stop):
return list(self.iter_by_slice(start, stop))
[docs] @abstractmethod
def get_single_value_by_ids(self, ids):
""" If possible, returns a single value shared by all the IDs.
For multiple values, use `for x in list`, `iter(list)`,\
`list.iter`, or one of the `iter_ranges` methods.
:return: Value shared by all elements with these IDs
:raises spinn_utilities.ranged.MultipleValuesException: \
If even one elements has a different value. \
Not thrown if elements outside of the IDs have a different value,\
even if these elements are between the ones pointed to by IDs
"""
def __getitem__(self, selector):
""" Supports the `list[x]` to return an element or slice of the list.
:param selector: The int ID, slice
:return: The element[key] or the slice
"""
# If the key is a slice, get the values from the slice
if isinstance(selector, slice):
# If the slice is continuous, use the continuous slice getter
if selector.step is None or selector.step == 1:
return list(self.iter_by_slice(
slice_start=selector.start, slice_stop=selector.stop))
# Otherwise get the items one by one using the start, stop, and
# step from the slice
return [self[i] for i in xrange(*selector.indices(self._size))]
# If the key is an int, get the single value
elif isinstance(selector, int):
# Handle negative indices
if selector < 0:
selector += len(self)
return self.get_value_by_id(selector)
else:
raise TypeError("Invalid argument type.")
[docs] def iter_by_id(self, id): # @ReservedAssignment
""" Fast but *not* update-safe iterator by one ID.
While `next` can only be called once, this is an iterator so it can\
be mixed in with other iterators.
:param id: ID
:return: yields the elements
"""
yield self.get_value_by_id(id)
[docs] def iter_by_ids(self, ids):
""" Fast but *not* update-safe iterator by collection of IDs.
.. note::
Duplicate/Repeated elements are yielded for each ID.
:param ids: IDs
:return: yields the elements pointed to by IDs
"""
ranges = self.iter_ranges()
(_, stop, value) = next(ranges)
for id_value in ids:
# If range is too far ahead, reset to start
if id_value < stop:
ranges = self.iter_ranges()
(_, stop, value) = next(ranges)
# Move on until the ID is in range
while id_value >= stop:
(_, stop, value) = next(ranges)
yield value
[docs] def iter(self):
""" Update-safe iterator of all elements.
.. note::
Duplicate/Repeated elements are yielded for each ID
:return: yields each element one by one
"""
for id_value in xrange(self._size):
yield self.get_value_by_id(id_value)
def __iter__(self):
""" Fast but *not* update-safe iterator of all elements.
.. note::
Duplicate/Repeated elements are yielded for each ID
:return: yields each element one by one
"""
try:
if self.range_based():
for (start, stop, value) in self.iter_ranges():
for _ in xrange(stop - start):
yield value
else:
for id_value in xrange(self._size):
yield self.get_value_by_id(id_value)
except StopIteration:
return
[docs] def iter_by_slice(self, slice_start, slice_stop):
""" Fast but *not* update-safe iterator of all elements in the slice.
.. note::
Duplicate/Repeated elements are yielded for each ID
:return: yields each element one by one
"""
slice_start, slice_stop = self._check_slice_in_range(
slice_start, slice_stop)
if self.range_based():
for (start, stop, value) in \
self.iter_ranges_by_slice(slice_start, slice_stop):
for _ in xrange(start, stop):
yield value
else:
for id_value in xrange(slice_start, slice_stop):
yield self.get_value_by_id(id_value)
[docs] def iter_by_selector(self, selector=None):
""" Fast but *not* update-safe iterator of all elements in the slice.
:param selector: See :py:meth:`AbstractSized.selector_to_ids`
"""
# No Selector so iter all fast
if selector is None:
return self.__iter__()
if isinstance(selector, int):
# Handle negative indices
if selector < 0:
selector += len(self)
return self.iter_by_id(selector)
# If the key is a slice, get the values from the slice
if isinstance(selector, slice):
# If the slice is continuous, use the continuous slice getter
if selector.step is None or selector.step == 1:
return self.iter_by_slice(selector.start, selector.stop)
else:
(slice_start, slice_stop, step) = selector.indices(self._size)
return self.iter_by_ids(range(slice_start, slice_stop, step))
ids = self.selector_to_ids(selector)
return self.iter_by_ids(ids)
[docs] def get_values(self, selector=None):
""" Get the value all elements pointed to the selector.
Note: Unlike `__get_item__` this method always returns a list even if\
the selector is a single int
:param selector: See :py:meth:`AbstractSized.selector_to_ids`
:return: returns a list if the item which may be empty or have only \
single value
"""
return list(self.iter_by_selector(selector))
def __contains__(self, item):
return any(numpy.array_equal(value, item)
for (_, _, value) in self.iter_ranges())
[docs] def count(self, x):
""" Counts the number of elements in the list with value ``x``
:param x:
:return:
"""
return sum(
stop - start
for (start, stop, value) in self.iter_ranges()
if numpy.array_equal(value, x))
[docs] def index(self, x):
""" Finds the first ID of the first element in the list with value\
``x``
:param x:
:return:
"""
for (start, _, value) in self.iter_ranges():
if numpy.array_equal(value, x):
return start
raise ValueError("{} is not in list".format(x))
[docs] @abstractmethod
def iter_ranges(self):
""" Fast but *not* update-safe iterator of the ranges.
:return: yields each range one by one
"""
[docs] def iter_ranges_by_id(self, id): # @ReservedAssignment
""" Iterator of the range for this ID
.. note::
The start and stop of the range will be reduced to just the ID
This method purpose is so one a control method can select which\
iterator to use.
:return: yields the one range
"""
self._check_id_in_range(id)
for (_, stop, value) in self.iter_ranges():
if id < stop:
yield (id, id + 1, value)
break
[docs] @abstractmethod
def iter_ranges_by_slice(self, slice_start, slice_stop):
""" Fast but *not* update-safe iterator of the ranges covered by this\
slice.
.. note::
The start and stop of the range will be reduced to just the\
IDs inside the slice.
:return: yields each range one by one
"""
[docs] def iter_ranges_by_ids(self, ids):
""" Fast but *not* update-safe iterator of the ranges covered by these\
IDs.
For consecutive IDs where the elements have the same value a single\
range may be yielded.
.. note::
The start and stop of the range will be reduced to just the IDs
:return: yields each range one by one
"""
range_pointer = 0
result = None
ranges = list(self.iter_ranges())
for id_value in ids:
# check if ranges reset so too far ahead
if id_value < ranges[range_pointer][0]:
range_pointer = 0
while id_value > ranges[range_pointer][0]:
range_pointer += 1
# check if pointer needs to move on
while id_value >= ranges[range_pointer][1]:
range_pointer += 1
if result is not None:
if (result[1] == id_value and numpy.array_equal(
result[2], ranges[range_pointer][2])):
result = (result[0], id_value + 1, result[2])
continue
yield result
result = (id_value, id_value + 1, ranges[range_pointer][2])
yield result
[docs] @abstractmethod
def get_default(self):
""" Gets the default value of the list. \
Just in case we later allow to increase the number of elements.
:return: Default value
"""
def __add__(self, other):
""" Support for `new_list = list1 + list2`. \
Applies the add operator over this and other to create a new list
The values of the new list are created on the fly so any changes to\
the original lists are reflected.
:param other: another list
:type other: AbstractList
:return: new list
:rtype: AbstractList
"""
if isinstance(other, AbstractList):
return DualList(
left=self, right=other, operation=lambda x, y: x + y)
if isinstance(other, numbers.Number):
return SingleList(a_list=self, operation=lambda x: x + other)
raise Exception("__add__ operation only supported for other "
"RangedLists and numerical Values")
def __sub__(self, other):
""" Support for `new_list = list1 - list2`. \
Applies the add operator over this and other to create a new list.
The values of the new list are created on the fly so any changes to\
the original lists are reflected.
:param other: another list
:type other: AbstractList
:return: new list
:rtype: AbstractList
"""
if isinstance(other, AbstractList):
return DualList(
left=self, right=other, operation=lambda x, y: x - y)
if isinstance(other, numbers.Number):
return SingleList(a_list=self, operation=lambda x: x - other)
raise Exception("__sub__ operation only supported for other "
"RangedLists and numerical Values")
def __mul__(self, other):
""" Support for `new_list = list1 * list2`. \
Applies the multiplication operator over this and other.
The values of the new list are created on the fly so any changes to\
the original lists are reflected.
:param other: another list
:type other: AbstractList
:return: new list
:rtype: AbstractList
"""
if isinstance(other, AbstractList):
return DualList(
left=self, right=other, operation=lambda x, y: x * y)
if isinstance(other, numbers.Number):
return SingleList(a_list=self, operation=lambda x: x * other)
raise Exception("__mul__ operation only supported for other "
"RangedLists and numerical Values")
def __div__(self, other):
""" Support for `new_list = list1 / list2`. \
Applies the division operator over this and other to create a new list.
The values of the new list are created on the fly so any changes to\
the original lists are reflected.
:param other: another list
:type other: AbstractList
:return: new list
:rtype: AbstractList
"""
if isinstance(other, AbstractList):
return DualList(
left=self, right=other, operation=lambda x, y: x / y)
if isinstance(other, numbers.Number):
if numpy.isin(0, other):
raise ZeroDivisionError()
return SingleList(a_list=self, operation=lambda x: x / other)
raise Exception("__div__ operation only supported for other "
"RangedLists and numerical Values")
# Python 3 support
__truediv__ = __div__
def __floordiv__(self, other):
""" Support for `new_list = list1 // list2`. \
Applies the floor division operator over this and other.
:param other: another list
:type other: AbstractList
:return: new list
:rtype: AbstractList
"""
if isinstance(other, AbstractList):
return DualList(
left=self, right=other, operation=lambda x, y: x // y)
if isinstance(other, numbers.Number):
if numpy.isin(0, other):
raise ZeroDivisionError()
return SingleList(a_list=self, operation=lambda x: x // other)
raise Exception("__floordiv__ operation only supported for other "
"RangedLists and numerical Values")
[docs] def apply_operation(self, operation):
""" Applies a function on the list to create a new one. \
The values of the new list are created on the fly so any changes to\
the original lists are reflected.
:param operation: \
A function that can be applied over the individual values to\
create new ones.
:return: new list
:rtype: :py:class:`.AbstractList`
"""
return SingleList(a_list=self, operation=operation)
[docs]@add_metaclass(AbstractBase)
class SingleList(AbstractList):
""" A List that performs an operation on the elements of another list.
"""
__slots__ = [
"_a_list", "_operation"]
def __init__(self, a_list, operation, key=None):
"""
:param a_list: The list to perform the operation on
:param operation:\
A function which takes a single value and returns the result of\
the operation on that value
:param key: The dict key this list covers.\
This is used only for better Exception messages
"""
super(SingleList, self).__init__(size=len(a_list), key=key)
self._a_list = a_list
self._operation = operation
[docs] @overrides(AbstractList.range_based)
def range_based(self):
return self._a_list.range_based()
[docs] @overrides(AbstractList.get_value_by_id)
def get_value_by_id(self, id): # @ReservedAssignment
return self._operation(self._a_list.get_value_by_id(id))
[docs] @overrides(AbstractList.get_single_value_by_slice)
def get_single_value_by_slice(self, slice_start, slice_stop):
return self._operation(self._a_list.get_single_value_by_slice(
slice_start, slice_stop))
[docs] @overrides(AbstractList.get_single_value_by_ids)
def get_single_value_by_ids(self, ids):
return self._operation(self._a_list.get_single_value_by_ids(ids))
[docs] @overrides(AbstractList.iter_ranges)
def iter_ranges(self):
for (start, stop, value) in self._a_list.iter_ranges():
yield (start, stop, self._operation(value))
[docs] @overrides(AbstractList.get_default)
def get_default(self):
return self._operation(self._a_list.get_default())
[docs] @overrides(AbstractList.iter_ranges_by_slice)
def iter_ranges_by_slice(self, slice_start, slice_stop):
for (start, stop, value) in \
self._a_list.iter_ranges_by_slice(slice_start, slice_stop):
yield (start, stop, self._operation(value))
[docs]@add_metaclass(AbstractBase)
class DualList(AbstractList):
""" A list which combines two other lists with an operation.
"""
__slots__ = [
"_left", "_operation", "_right"]
def __init__(self, left, right, operation, key=None):
"""
:param left: The first list to combine
:param right: The second list to combine
:param operation:\
The operation to perform as a function that takes two values and\
returns the result of the operation
:param key:\
The dict key this list covers.\
This is used only for better Exception messages
"""
if len(left) != len(right):
raise Exception("Two list must have the same size")
super(DualList, self).__init__(size=len(left), key=key)
self._left = left
self._right = right
self._operation = operation
[docs] @overrides(AbstractList.range_based)
def range_based(self):
return self._left.range_based() and self._right.range_based()
[docs] @overrides(AbstractList.get_value_by_id)
def get_value_by_id(self, id): # @ReservedAssignment
return self._operation(
self._left.get_value_by_id(id), self._right.get_value_by_id(id))
[docs] @overrides(AbstractList.get_single_value_by_slice)
def get_single_value_by_slice(self, slice_start, slice_stop):
return self._operation(
self._left.get_single_value_by_slice(slice_start, slice_stop),
self._right.get_single_value_by_slice(slice_start, slice_stop))
[docs] @overrides(AbstractList.get_single_value_by_ids)
def get_single_value_by_ids(self, ids):
return self._operation(
self._left.get_single_value_by_ids(ids),
self._right.get_single_value_by_ids(ids))
[docs] @overrides(AbstractList.iter_by_slice)
def iter_by_slice(self, slice_start, slice_stop):
slice_start, slice_stop = self._check_slice_in_range(
slice_start, slice_stop)
if self._left.range_based():
if self._right.range_based():
# Both lists are range based
for (start, stop, value) in \
self.iter_ranges_by_slice(slice_start, slice_stop):
for _ in range(start, stop):
yield value
else:
# Left list is range based, right is not
left_iter = self._left.iter_ranges_by_slice(
slice_start, slice_stop)
right_iter = self._right.iter_by_slice(slice_start, slice_stop)
for (start, stop, left_value) in left_iter:
for _ in range(start, stop):
yield self._operation(left_value, next(right_iter))
else:
if self._right.range_based():
# Right list is range based left is not
left_iter = self._left.iter_by_slice(
slice_start, slice_stop)
right_iter = self._right.iter_ranges_by_slice(
slice_start, slice_stop)
for (start, stop, right_value) in right_iter:
for _ in range(start, stop):
yield self._operation(next(left_iter), right_value)
else:
# Neither list is range based
left_iter = self._left.iter_by_slice(slice_start, slice_stop)
right_iter = self._right.iter_by_slice(slice_start, slice_stop)
while True:
try:
yield self._operation(
next(left_iter), next(right_iter))
except StopIteration:
return
[docs] @overrides(AbstractList.iter_ranges)
def iter_ranges(self):
left_iter = self._left.iter_ranges()
right_iter = self._right.iter_ranges()
return self._merge_ranges(left_iter, right_iter)
[docs] @overrides(AbstractList.iter_ranges_by_slice)
def iter_ranges_by_slice(self, slice_start, slice_stop):
left_iter = self._left.iter_ranges_by_slice(slice_start, slice_stop)
right_iter = self._right.iter_ranges_by_slice(slice_start, slice_stop)
return self._merge_ranges(left_iter, right_iter)
def _merge_ranges(self, left_iter, right_iter):
(left_start, left_stop, left_value) = next(left_iter)
(right_start, right_stop, right_value) = next(right_iter)
try:
while True:
yield (max(left_start, right_start),
min(left_stop, right_stop),
self._operation(left_value, right_value))
if left_stop < right_stop:
(left_start, left_stop, left_value) = next(left_iter)
elif left_stop > right_stop:
(right_start, right_stop, right_value) = next(right_iter)
else:
(left_start, left_stop, left_value) = next(left_iter)
(right_start, right_stop, right_value) = next(right_iter)
except StopIteration:
return
[docs] @overrides(AbstractList.get_default)
def get_default(self):
return self._operation(
self._left.get_default(), self._right.get_default())