#!/usr/bin/env python
# -*- coding: utf-8 -*-
# (C) 2017 David Toro <davsamirtor@gmail.com>
# compatibility with python 2 and 3
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
from builtins import object
from six import with_metaclass
from past.builtins import basestring
from past.utils import old_div
# import build-in modules
import gc
import sys
from abc import ABCMeta
from functools import wraps
from threading import RLock
from ordered_set import OrderedSet
from collections import MutableMapping, MutableSet, namedtuple, OrderedDict, deque
from weakref import ref, WeakValueDictionary, KeyedRef, _IterationGuard # https://stackoverflow.com/a/36788452/5288758
# import third party modules
from .geometry import cnt_check_intersection
from combomethod import combomethod # https://stackoverflow.com/q/2589690/5288758
from scipy.spatial.distance import euclidean
import numpy as np
import cv2
(cv_major_ver, cv_minor_ver, cv_subminor_ver) = cv2.__version__.split('.')
PY3 = sys.version_info[0] == 3
if PY3:
xrange = range
# special variables
# __all__ = []
__author__ = "David Toro"
# __copyright__ = "Copyright 2017, The <name> Project"
# __credits__ = [""]
__license__ = "GPL"
# __version__ = "1.0.0"
__maintainer__ = "David Toro"
__email__ = "davsamirtor@gmail.com"
# __status__ = "Pre-release"
#class HashableDict(dict):
# def __hash__(self):
# return id(self)
#
#
#class HashableOrderedDict(OrderedDict):
# def __hash__(self):
# return id(self)
[docs]class SpaceHandle(MutableMapping):
"""handle names"""
# https://stackoverflow.com/a/3387975/5288758
def __init__(self, parent, handle):
self.parent = parent
self.handle = handle
[docs] def change_name(self, old_name, new_name, obj):
self.handle[new_name] = self.handle.pop(old_name)
[docs] def remove_name(self, name):
#self.handle.pop(name)
self.parent._space_remove_child(self.handle[name]())
def __getitem__(self, key):
return self.handle[self.__keytransform__(key)]
def __setitem__(self, key, value):
self.handle[self.__keytransform__(key)] = value
def __delitem__(self, key):
del self.handle[self.__keytransform__(key)]
def __iter__(self):
return iter(self.handle)
def __len__(self):
return len(self.handle)
def __keytransform__(self, key):
return key
def __hash__(self):
return id(self)
[docs]class WeakWatcher(KeyedRef):
__slots__ = ('real_data', )
def __new__(type, ob, callback=None, key=None, real_data=None):
self = ref.__new__(type, ob, callback)
self.key = key
self.real_data = real_data
return self
def __init__(self, ob, callback=None, key=None, real_data=None):
super(WeakWatcher, self).__init__(ob, callback, key)
def __call__(self, *args, **kwargs):
ans = super(WeakWatcher, self).__call__(*args, **kwargs)
if ans is None:
return
if self.real_data is not None:
return self.real_data
return ans
def _create_new(self, callback=None, key=None):
real_data = self.real_data
self.real_data = None # force watch data to appear
watch_data = self()
return self.__class__(watch_data, callback, key, real_data)
[docs]class WeakWatcherWithData(WeakWatcher):
def __new__(type, ob, callback=None, key=None, real_data=None, **kwargs):
self = ref.__new__(type, ob, callback)
self.__dict__.update(kwargs)
self.key = key
self.real_data = real_data
return self
def __init__(self, ob, callback=None, key=None, real_data=None, **kwargs):
super(WeakWatcher, self).__init__(ob, callback, key)
def _create_new(self, callback=None, key=None):
real_data = self.real_data
self.real_data = None # force watch data to appear
return self.__class__(self(), callback, key, real_data,
**self.__dict__)
[docs]class WeakWatcherDictionary(WeakValueDictionary):
"""Mapping class that references values weakly.
Entries in the dictionary will be discarded when no strong
reference to the value exists anymore
"""
# We inherit the constructor without worrying about the input
# dictionary; since it uses our .update() method, we get the right
# checks (if the other dictionary is a WeakValueDictionary,
# objects are unwrapped on the way out, and we always wrap on the
# way in).
def __setitem__watcher(self, key, value, dic=None):
if dic is None:
dic = self.data
if isinstance(value, WeakWatcher):
dic[key] = value._create_new(callback=self._remove, key=key)
elif isinstance(value, ref):
dic[key] = KeyedRef(value(), self._remove, key)
else:
dic[key] = KeyedRef(value, self._remove, key)
def __setitem__(self, key, value):
if self._pending_removals:
self._commit_removals()
self.__setitem__watcher(key, value)
[docs] def setdefault(self, key, default=None):
try:
wr = self.data[key]
except KeyError:
if self._pending_removals:
self._commit_removals()
self.__setitem__watcher(key, default)
return default
else:
return wr()
[docs] def update(*args, **kwargs):
if not args:
raise TypeError("descriptor 'update' of 'WeakValueDictionary' "
"object needs an argument")
self, args = args[0], args[1:]
if len(args) > 1:
raise TypeError(
'expected at most 1 arguments, got %d' % len(args))
dict = args[0] if args else None
if self._pending_removals:
self._commit_removals()
d = self.data
if dict is not None:
if not hasattr(dict, "items"):
dict = type({})(dict)
for key, o in dict.items():
self.__setitem__watcher(key, o, dic=d)
if len(kwargs):
self.update(kwargs)
[docs]class WeakRefDictionary(WeakWatcherDictionary):
"""Mapping class that references values weakly.
Entries in the dictionary will be discarded when no strong
reference to the value exists anymore
"""
# We inherit the constructor without worrying about the input
# dictionary; since it uses our .update() method, we get the right
# checks (if the other dictionary is a WeakValueDictionary,
# objects are unwrapped on the way out, and we always wrap on the
# way in).
def __hash__(self):
return id(self)
def __getitem__(self, key):
wr = self.data[key]
o = wr()
if o is None:
raise KeyError(key)
else:
return wr
[docs] def copy(self):
new = WeakValueDictionary()
for key, wr in self.data.items():
o = wr()
if o is not None:
new[key] = wr
return new
__copy__ = copy
def __deepcopy__(self, memo):
from copy import deepcopy
new = self.__class__()
for key, wr in self.data.items():
o = wr()
if o is not None:
new[deepcopy(key, memo)] = wr
return new
[docs] def get(self, key, default=None):
try:
wr = self.data[key]
except KeyError:
return default
else:
o = wr()
if o is None:
# This should only happen
return default
else:
return wr
[docs] def items(self):
with _IterationGuard(self):
for k, wr in self.data.items():
v = wr()
if v is not None:
yield k, wr
[docs] def values(self):
with _IterationGuard(self):
for wr in self.data.values():
obj = wr()
if obj is not None:
yield wr
[docs] def popitem(self):
if self._pending_removals:
self._commit_removals()
while True:
key, wr = self.data.popitem()
o = wr()
if o is not None:
return key, wr
[docs] def pop(self, key, *args):
if self._pending_removals:
self._commit_removals()
try:
wr = self.data.pop(key)
except KeyError:
if args:
return args[0]
raise
if wr() is None:
raise KeyError(key)
else:
return wr
[docs] def setdefault(self, key, default=None):
try:
wr = self.data[key]
except KeyError:
if self._pending_removals:
self._commit_removals()
self.__setitem__watcher(key, default)
return self.data[key]
else:
return wr
[docs]class Space(with_metaclass(MetaSpace, object)):
"""
Anything that is created must have a name attribute and be in the Space
"""
_space_entities = WeakValueDictionary()
_space_collect_entities = True
def __new__(cls, *args, **kwargs):
self = super(Space, cls).__new__(cls)
self._space_parent_ = None # Space object can only have one parent
self._space_entities[str(id(self))] = self
self._space_children = SpaceHandle(self, WeakRefDictionary()) # Space children
self._space_name_handles = set() # Space positions
return self
def _space_correct_hierarchy(self, old_hierarchy):
"""
private function used to correct an old hierarchy of this
object and all its children.
:param old_hierarchy: string of the previous hierarchy
"""
# register new hierarchy
new_hierarchy = self._space_hierarchy()
# change old hierarchy
self._space_entities[new_hierarchy] = self._space_entities.pop(old_hierarchy)
# change children's old hierarchies
for i in self._space_children.values():
i = i()
old = new_hierarchy + "." + i.name
new = old_hierarchy + "." + i.name
self._space_entities[old] = self._space_entities.pop(new)
@property
def name(self):
# unique name to exist in one place in the space
try:
return self._name
except AttributeError:
return str(id(self))
@name.setter
def name(self, value):
"""change name of object which by default is id"""
if value and value != self.name:
# check it is a valid name
if not isinstance(value, basestring):
raise TypeError("name must be a string not {}".format(type(value)))
special = "."
if any(i in value for i in special):
raise NameError("name must not contain {}".format(special))
# check name is not in hierarchical conflict
if self._space_parent_ is None:
# only check in hierarchy if self is in Space
# because in-place conflict check tests the other cases
future_hierarchy = self._space_hierarchy(value, use_name=False)
if self._space_hierarchy_in_space(future_hierarchy):
parent = self._space_hierarchy(None, use_name=False)
if not parent:
parent = Space.__name__
raise KeyError("name '{}' already exists in parent '{}'".format(value, parent))
elif any(value in i for i in self._space_name_handles):
# check name is not in in-place conflict
raise KeyError("name '{}' already exists in managed names".format(value))
# METHOD: IN-PLACE
# get old name
old_name = self.name
# correct name in handles
for handle in self._space_name_handles:
handle.change_name(old_name, value, self)
#handle[value] = handle.pop(old_name)
# METHOD: HIERARCHICAL
# get old hierarchy
old_hierarchy = self._space_hierarchy()
# assign new name
self._name = value
# correct all names
self._space_correct_hierarchy(old_hierarchy)
self._name_changed_event(old_name)
@staticmethod
def _space_collect():
"""This process is expensive so do not use if not needed"""
gc.collect() # force to collect and update space
def _space_hierarchy(self, key=None, use_name=True):
"""
get hierarchy of this object
:param key: name of append in hierarchy. None to not append
:param use_name: whether to use the name of this object. That is
if True get the hierarchy until me and append key=child,
if False get the hierarchy until parent and append key=brother
:return: hierarchy
"""
# support to find parent hierarchy
parent = self._space_parent_
train = []
if parent is not None:
train.append(parent._space_hierarchy())
if use_name: train.append(self.name)
if key is not None:
train.append(key)
return ".".join(train)
def _space_hierarchy_in_space(self, hierarchy):
"""
check a hierarchy is already in the space
:param hierarchy: string of hierarchy
:return: True if hierarchy in space else False
"""
# do not proceed if there is a conflict in hierarchy
if hierarchy in self._space_entities:
# ensure hierarchy is updated to use self._space_entities
if self._space_collect_entities:
self._space_collect()
# previous evaluation was lazy, this ones evaluates
# with updated self._space_entities
return hierarchy in self._space_entities
else:
return False
def _space_get_item(self, key=None):
"""
get an name from space of object and their children.
:param key: string of Space object name to lookup. Use "."
to move under a parent or separate parents using a list.
Use "*" to find possible matches.
e.g. "Grand-space-parent.space-parent.my name"
["Grand-space-parent","space-parent","my name"]
"Grand-space-parent.space-parent.my*e"
["Grand-space-parent","space-parent","my*e"]
"Grand-space-parent.space-pa*"
["Grand-space-parent","space-pa*"]
:return:
"""
if isinstance(key, basestring):
# METHOD: HIERARCHICAL
#return self._space_entities[self._space_hierarchy(key)]
key = key.split(".")
# METHOD: IN PLACE
k = key.pop(0)
if key:
return self._space_children[k]()._space_get_item(key)
try:
index = k.index("*")
starts = k[:index]
ends = k[index+1:]
return {i: j[1]() for i, j in self._space_children.items()
if i.startswith(starts) and i.endswith(ends)}
except ValueError:
return self._space_children[k]()
@combomethod
def _space_get_from_hierarchy(self, key):
"""
get an name from hierarchy of object. If called from class
it looks-up object starting from outer Space.
:param key: string of Space object name to lookup. Use "."
to move under a parent and "*" to find possible matches.
e.g. "Grand-parent.parent.my name"
"Grand-parent.parent.my*e"
"Grand-parent.pa*"
:return:
"""
# https://stackoverflow.com/a/7374385/5288758
if isinstance(self, Space):
# get parent hierarchy instead of current hierarchy
key = self._space_hierarchy(key, use_name=False)
try:
index = key.index("*")
starts = key[:index]
ends = key[index+1:]
return {i:j for i, j in self._space_entities.items()
if i.startswith(starts) and i.endswith(ends)}
except ValueError:
return self._space_entities[key]
@property
def _space_parent(self):
# unique parent for hierarchical lookup
try:
if self._space_parent_ is None:
# not having a parent is the same as None
# and None must refer to Space
return Space
return self._space_parent_
except AttributeError:
return Space
@_space_parent.setter
def _space_parent(self, value):
"""
add a parent to this object to appear in its hierarchy as
a child.
:param value: parent Space object. None to specify outer Space
"""
if value is Space:
value = None
if value != self._space_parent_:
if value is self:
raise ValueError("parent cannot be itself")
if value is not None and self._space_parent_in_parents(value):
raise ValueError("circular reference in parents")
# get future position in hierarchy
if value is None:
future_hierarchy = self.name
#else:
# future_hierarchy = value._space_hierarchy(key=self.name)
# do not proceed if there is a conflict in hierarchy
if self._space_hierarchy_in_space(future_hierarchy):
raise KeyError("conflicting name '{}' in children of "
"parent '{}'".format(self.name, Space.__name__))
# METHOD: IN-PLACE
# register child in parent
else: # if value is not None:
# this raises if there is a conflicting
# child of the same name but it can
# be thrown by the hierarchy tree
value._space_add_child(self)
# unregister child in old parent
old_parent = self._space_parent_
if old_parent is not None:
old_parent._space_remove_child(self)
# METHOD: HIERARCHICAL
# get old hierarchy
old_hierarchy = self._space_hierarchy()
# assign new parent
self._space_parent_ = value
# correct all names
self._space_correct_hierarchy(old_hierarchy)
# call event
self._parent_changed_event(old_parent)
def _space_parent_in_parents(self, parent):
"""
check a parent is already in the family tree of this object
:param parent: parent to check in tree
:return: True if parent in tree else False
"""
# do not proceed if there is circular reference for parent
parent_ = parent._space_parent_
if parent_ is None:
return False
elif parent_ is self:
return True
else:
# keep looking into father until found
return self._space_parent_in_parents(parent_)
def _space_add_child(self, child):
"""
add an object in the internal children to appear in the Space
as inside this object.
:param child: child Space object
"""
name = child.name
if name in self._space_children:
if self._space_children[name]() is not child:
raise KeyError("conflicting name '{}' in children of "
"parent '{}'".format(child.name, self.name))
else:
#print("Warning: {} already in {}".format(child, self))
struct = self._space_children[name]
struct._count += 1
return
child_ref = WeakWatcherWithData(child)
child_ref._count = 0
self._space_children[name] = child_ref
child._space_name_handles.add(self._space_children)
def _space_remove_child(self, child):
"""
remove an object in the internal children to disappear in the Space
as inside this object.
:param child: child Space object
"""
name = child.name
struct = self._space_children[name]
struct._count -= 1
if struct._count <= 0:
del self._space_children[name]
child._space_name_handles.remove(self._space_children)
def _space_delete(self):
"""
delete Space object to outer Space. That is from all Space
objects like Groups, parents and containers.
"""
self._space_parent = None # break any parent relationship
name = self.name
# delete object from all handles, that is the Space in general
# self._space_name_handles will change size until it reaches 0
for handle in list(self._space_name_handles):
handle.remove_name(name)
def _name_changed_event(self, old_name):
"""
called when name is changed
:param old_name: old name of object. new name cam be accessed
as object.name
"""
return
def _parent_changed_event(self, old_parent):
"""
called when parent is changed
:param old_parent: old parent of object. If None it is outer Space
"""
return
[docs]def deco_name(func, ismethod=True):
"""
wrap function to give always 'name' variable
:param func:
:param ismethod:
:return:
"""
if ismethod:
@wraps(func)
def _func(self, *args, **kwargs):
name = kwargs.pop("name", None)
return func(self, name, *args, **kwargs)
else:
@wraps(func)
def _func(*args, **kwargs):
name = kwargs.pop("name", None)
return func(name, *args, **kwargs)
return _func
[docs]class GroupHandle(SpaceHandle):
[docs] def change_name(self, old_name, new_name, obj):
dict_to_update = self.parent.names
if self.handle is dict_to_update and isinstance(dict_to_update, OrderedDict):
# ordered dict cannot be destroyed and must keep the same order
i = self.parent.index(old_name)
items = list(dict_to_update.items())
old_name_, val = items[i]
if old_name != old_name_ or obj is not val:
raise RuntimeError("Group change_name operation failed "
"because its index method is broken")
items[i] = (new_name, val) # update new name
# because OrderedDict order cannot be altered
# then we need to clear it and re-insert the items
dict_to_update.clear()
dict_to_update.update(items)
else:
# assume it is a simple dict
self.handle[new_name] = self.handle.pop(old_name)
[docs] def remove_name(self, name):
self.parent.discard(name)
[docs]class Group(Space, MutableSet):
"""
Create group of objects withing the Space. This group can contain Space
objects and organize them hierarchically by assigning them as children,
as contained or simply adding them as private objects which cannot be
looked up in the Space.
The Group can be seen as an Ordered set that can iterate them as list
and retrieve objects by name or reference as in dictionaries.
"""
# https://stackoverflow.com/a/3387975/5288758
# https://github.com/LuminosoInsight/ordered-set/blob/master/ordered_set.py
# consider https://stackoverflow.com/a/11560258/5288758
def __init__(self, iterable=None, as_parent=False, as_contained=False):
# A list of keys to be removed safely. This should be Thread safe!
self._pending_removals = []
self._iterating = set()
self.lock_iteration = RLock()
def _remove(wr, selfref=ref(self)):
self = selfref()
if self is not None:
if self._iterating:
self._pending_removals.append(wr.key)
else:
del self[wr.key]
self._remove = _remove # callback to give to ref objects
# dictionary-like object to allocate names
if not hasattr(self, "names"):
self.names = OrderedDict()
# wrap names to a handle registered to this Group
self._group_handle = self._create_group_handle(self.names)
# allocate new data in Group
if iterable is not None:
#self |= iterable
self.update(iterable, as_parent=as_parent,
as_contained=as_contained)
def _create_group_handle(self, handle):
"""
create a GroupHandle wrapping a normal handle to this Group
:param handle: dictionary-like handle to be wrapped and
registered to this Group
:return: GroupHandle instance
"""
return GroupHandle(self, handle)
[docs] def give_remove_handle(self, key):
"""
give handle to safely remove key from Group. This should
be thread safe and even ig Group is on iterations.
"""
def remove(keyref=ref(key), selfref=ref(self)):
self = selfref()
key = keyref()
if self is not None and key is not None:
if self._iterating:
self._pending_removals.append(key)
else:
del self[key]
s = "remove " + str(key)
remove.__name__ = s
remove.__doc__ = s
return remove
def _commit_removals(self):
"""
remove pending removals when it is safe
"""
l = self._pending_removals
# We shouldn't encounter any KeyError, because this method should
# always be called *before* mutating the dict.
while l:
op, val = l.pop()
if op:
val() # execute operation
else:
# discard key
self._safe_discard(val)
def __len__(self):
return len(self.names)
def _getitem_name(self, name):
"""
this is used instead of self.names[key]
to show the user an adequate error
"""
try:
return self.names[name]
except KeyError:
raise KeyError("name {} is not in {}".format(name, self))
def __getitem__(self, index):
"""
Get the item at a given index.
If `index` is a slice, you will get back that slice of items.
If it's the slice [:], exactly the same object is returned.
(If you want an independent copy of a Group, use `Group.copy()`.)
If `index` is an iterable, you'll get a list of items
corresponding to those indices. This is similar to NumPy's
"fancy indexing" except the same object time is not returned.
"""
if isinstance(index, basestring):
# if looking by name
return self._getitem_name(index)
elif hasattr(index, '__index__') or isinstance(index, slice):
# if asked for a slice of the data
# it is not returned in the same class as they are registered
# in the space populating it
return list(self.names.values())[index]
elif hasattr(index, '__iter__'):
# if asked for a list of the elements
return [self[i] for i in index]
elif isinstance(index, Space) and self._getitem_name(index.name) is index:
# if index is an object, for membership check
return index
else:
raise KeyError("'{}' is not in {}".format(index, self))
[docs] def copy(self):
"""copy Group contents"""
return self.__class__(self)
def __getstate__(self):
if len(self) == 0:
# The state can't be an empty list.
# needs a truthy value, or else __setstate__ won't be run.
return (None,)
else:
return list(self)
def __setstate__(self, state):
if state == (None,):
self.__init__([])
else:
self.__init__(state)
def __contains__(self, key):
if isinstance(key, basestring):
return key in self.names
else:
try:
return self.names[key.name] is key
except KeyError:
return False
[docs] def add_as_child(self, key):
"""
assign this Group as parent of key and add it to the Group
"""
key._space_parent = self
return self.add(key)
[docs] def add_as_contained(self, key):
"""
assign this Group as container of key and add it to the Group
"""
self._space_add_child(key)
return self.add(key)
[docs] def add_as(self, key, parent=False, contained=False):
"""
add key to the Group as contained or Group as a parent or both
"""
if parent:
key._space_parent = self
if contained:
self._space_add_child(key)
return self.add(key)
def _safe_add(self, key):
"""
implementation of add method to be run safely
"""
if not isinstance(key, Space):
raise TypeError("object must be from Space not '{}'".format(key))
if key.name in self.names:
if self.names[key.name] is not key:
raise ValueError("name conflict. there is already an object "
"with name '{}' in this Group".format(key.name))
else:
self.names[key.name] = key
key._space_name_handles.add(self._group_handle)
return key
[docs] def add(self, key):
"""
Add `key` as an item to this Group, then return the key.
If `key` is already in the Group, does not adds and returns the key
"""
if self._iterating:
self._pending_removals.append((1, lambda: self._safe_add(key)))
else:
return self._safe_add(key)
append = add
[docs] def update(self, sequence, as_parent=False, as_contained=False):
"""
Update the Group with the given iterable sequence, then return
the returned value by self.add of the last element inserted.
"""
returned = None
#try:
for item in sequence:
returned = self.add_as(item, parent=as_parent,
contained=as_contained)
#except TypeError:
# raise ValueError('Argument needs to be an iterable, '
# 'got %s' % type(sequence))
return returned
[docs] def index(self, key):
"""
Get the index of a given entry, raising an IndexError if it's not
present.
`key` can be an iterable of entries that is not a string, in which case
this returns a list of indices.
"""
if (hasattr(key, '__iter__') and not isinstance(key, str) and
not isinstance(key, tuple)):
return [self.index(subkey) for subkey in key]
elif key not in self:
raise IndexError("{} not in {}".format(key, self))
elif isinstance(key, basestring):
# is name
for i, j in enumerate(self.names.keys()):
if j == key:
return i
else:
# is agent
for i, j in enumerate(self.names.values()):
if j is key:
return i
def _safe_pop(self):
"""
implementation of pop method to be run safely
"""
if not self.names:
raise KeyError('Group is empty')
key, value = self.names.popitem()
value._space_name_handles.remove(self._group_handle)
return value
[docs] def pop(self):
"""
Remove and return the last element from the Group.
Raises KeyError if the Group is empty.
"""
if self._iterating:
self._pending_removals.append((1, self._safe_pop))
else:
return self._safe_pop()
def _safe_discard(self, key):
"""
implementation of discard method to be run safely
"""
if not isinstance(key, basestring):
key = key.name
try:
value = self.names.pop(key)
except KeyError:
return
value._space_name_handles.remove(self._group_handle)
return value # sets return None but this can return value
[docs] def discard(self, value):
"""
Remove an element. Do not raise an exception if absent.
The MutableSet mixin uses this to implement the .remove() method, which
*does* raise an error when asked to remove a non-existent item.
"""
if self._iterating:
self._pending_removals.append((0, value))
else:
return self._safe_discard(value)
def _safe_clear(self):
"""
implementation of clear method to be run safely
"""
for i in self.names.values():
i._space_name_handles.remove(self._group_handle)
self.names.clear()
[docs] def clear(self):
"""
Remove all items from this Group.
"""
if self._iterating:
self._pending_removals.append((1, self._safe_clear))
else:
return self._safe_clear()
[docs] def clear_in_space(self):
"""
clear objects from group, other groups and space
"""
for o in self:
o._space_delete()
def _safe_iter(self):
"""
implementation of __iter__ method to be run safely
"""
return self.names.values()
def __iter__(self):
def safe_iter(self):
res = None
try:
res = self.lock_iteration.acquire(blocking=False)
with _IterationGuard(self):
for i in self._safe_iter():
yield i
finally:
if res:
self.lock_iteration.release()
return iter(safe_iter(self))
[docs] def reverse(self):
items = list(reversed(self.names.items()))
self.names.clear()
self.names.update(items)
def __reversed__(self):
return reversed(self.names.values())
def __repr__(self):
if not self:
return '%s()' % (self.__class__.__name__,)
return '%s(%r)' % (self.__class__.__name__, list(self))
def __str__(self):
name = getattr(self, "_name", "")
if name:
name = "<{}>".format(name)
if not self:
return '%s%s()' % (self.__class__.__name__, name)
return '%s%s(%r)' % (self.__class__.__name__, name, list(self))
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.names == other.names
try:
other_as_set = set(other)
except TypeError:
# If `other` can't be converted into a set, it's not equal.
return False
else:
return set(self) == other_as_set
def __enter__(self):
self.lock_iteration.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.lock_iteration.release()
[docs]class CompleteGroup(Group):
"""
Class to create Groups with faster facilities for indexing
and retrieving from indexes (faster retrieval) at the expense of slightly
slower times when adding Space objects and an slight increase of memory
usage. The difference with a pure Group is negligible when managing small
amounts of data. Use this class when the focus is manipulating indexes and
comparing data withing or among Groups. For intensive adding and removal
of objects use a pure Group.
"""
def __init__(self, iterable=None, as_parent=False, as_contained=False):
self.items = [] # keep items
self.map = {} # keep item and indexes
self.names = dict() # keep index names and items
super(CompleteGroup, self).__init__(iterable, as_parent, as_contained)
def __getitem__(self, index):
"""
Get the item at a given index.
If `index` is a slice, you will get back that slice of items.
If it's the slice [:], exactly the same object is returned.
(If you want an independent copy of a Group, use `Group.copy()`.)
If `index` is an iterable, you'll get a list of items
corresponding to those indices. This is similar to NumPy's
"fancy indexing" except the same object time is not returned.
"""
if isinstance(index, basestring):
# if looking by name
return self._getitem_name(index)
elif hasattr(index, '__index__') or isinstance(index, slice):
# if asked for a slice of the data
# it is not returned in the same class as they are registered
# in the space populating it
return self.items[index]
elif hasattr(index, '__iter__'):
# if asked for a list of the elements
return [self[i] for i in index]
elif index in self.map:
# if index is an object, for membership check
return index
else:
raise KeyError("'{}' is not in {}".format(index, self))
def __contains__(self, key):
if isinstance(key, basestring):
return key in self.names
else:
return key in self.map
[docs] def index(self, key):
"""
Get the index of a given entry, raising an IndexError if it's not
present. `key` can be an iterable of entries that is not a string,
in which case this returns a list of indices.
"""
if (hasattr(key, '__iter__') and not isinstance(key, str) and
not isinstance(key, tuple)):
return [self.index(subkey) for subkey in key]
# key can be anything supported by group
try:
return self.map[self[key]]
except KeyError:
raise IndexError("{} not in {}".format(key, self))
def _safe_add(self, key):
"""
implementation of add method to be run safely
"""
if not isinstance(key, Space):
raise TypeError("object must be from Space not '{}'".format(key))
if key.name in self.names:
if self.names[key.name] is not key:
raise ValueError("name conflict. there is already an object "
"with name '{}' in this Group".format(key.name))
else: # if key not in self.map:
self.names[key.name] = key
key._space_name_handles.add(self._group_handle)
self.map[key] = len(self.items)
self.items.append(key)
return self.map[key]
def _safe_pop(self):
"""
implementation of pop method to be run safely
"""
if not self.items:
raise KeyError('Group is empty')
elem = self.items[-1]
del self.items[-1]
del self.map[elem]
del self.names[elem.name]
elem._space_name_handles.remove(self._group_handle)
return elem
def _safe_discard(self, key):
"""
implementation of discard method to be run safely
"""
if isinstance(key, basestring):
try:
key = self.names[key]
except KeyError:
return # not deleted
try:
i = self.map[key]
except KeyError:
return # not deleted
del self.items[i]
del self.map[key]
del self.names[key.name]
key._space_name_handles.remove(self._group_handle)
for k, v in self.map.items():
if v >= i:
self.map[k] = v - 1
return key # sets return None but this can return value
def _safe_clear(self):
"""
implementation of clear method to be run safely
"""
names = self._group_handle
for i in self.items:
i._space_name_handles.remove(names)
self.items.clear()
names.clear()
self.map.clear()
def _safe_iter(self):
"""
implementation of __iter__ method to be run safely
"""
return self.items
[docs] def reverse(self):
self.items.reverse()
# remap indexes
for i, key in enumerate(self.items):
self.map[key] = i
def __reversed__(self):
return reversed(self.items)
def __eq__(self, other):
if isinstance(other, self.__class__):
return len(self) == len(other) and self.items == other.items
try:
other_as_set = set(other)
except TypeError:
# If `other` can't be converted into a set, it's not equal.
return False
else:
return set(self) == other_as_set
[docs]class Agent(Space):
"""
Anything in the World with an individual behaviour which is movable and
cannot be in more than one place at a time, that is and Agent. They
can be observable and have positions in the Space. From here
anything is derived and populated in the world.
"""
def __init__(self):
self._to_compute = True
self.active = True
self.visible = True
self.drawing = None
self.cnt = None
self.rotated_box = None
def _compute(self):
"""
function executed when internal data must be updated
"""
self._to_compute = False
return
[docs] def compute(self):
"""
update internal data if necessary
:return True if computed, else false
"""
if self._to_compute:
# compute
self._compute()
# if computed
return not self._to_compute
[docs] def computed_vis(self):
pass
[docs] def raw_vis(self):
pass
[docs] @staticmethod
def get_bounding_box_from_cnt(cnt, _type=None):
return Agent.get_bounding_box_from_rotated_box(
Agent.get_rotated_box_from_cnt(cnt), _type)
[docs] @staticmethod
def get_bounding_box_from_rotated_box(rotated_box, _type=None):
(tx, ty), (sz_x, sz_y), angle = rotated_box
# get half distance to center
x, y = sz_x / 2., sz_y / 2.
if _type is None:
return tx - x, ty - y, sz_x, sz_y
else:
return _type(tx - x), _type(ty - y), _type(sz_x), _type(sz_y)
[docs] @staticmethod
def get_rotated_box_from_cnt(cnt, _type=None):
"""
get a rotated box format (center, size, angle)
from a contour of N points.
"""
if _type is None:
return cv2.minAreaRect(cnt)
else:
(cx, cy), (x, y), a = cv2.minAreaRect(cnt)
return (_type(cx), _type(cy)), (_type(x), _type(y)), _type(a)
[docs] @staticmethod
def get_rotated_box_from_bounding_box(bounding_box, _type=None):
x, y, sz_x, sz_y = bounding_box
# get half distance to center
tx, ty = sz_x / 2., sz_y / 2.
if _type is None:
return (x + tx, y + ty), (sz_x, sz_y), 0
else:
return ((_type(x + tx), _type(y + ty)),
(_type(sz_x), _type(sz_y)), _type(0))
[docs] @staticmethod
def get_cnt_from_rotated_box(rotated_box, _type=np.int32):
"""
get a contour of 4 points with format [left-top, right-top,
right-bottom, left-bottom] from a rotated box with format
(center, size, angle)
"""
# format (x,y),(width,height),theta e.g. ((122, 239), (4, 4), 0)
(tx, ty), (sz_x, sz_y), angle = rotated_box
# get half distance to center
x, y = sz_x/2., sz_y/2.
# construct contour in origin
cnt_rect = np.array([((-x, -y),), ((x, -y),),
((x, y),), ((-x, y),)], np.float)#.reshape(-1, 1, 2)
# create transformation matrix to rotate and translate
# https://en.wikipedia.org/wiki/Transformation_matrix
a = angle * np.pi / 180. # convert from degrees to radians
c = np.cos(a)
s = np.sin(a)
H = np.array([(c, -s, tx),
(s, c, ty),
(0, 0, 1)], np.float)
# apply transformation with desired type
# https://docs.opencv.org/2.4/modules/core/doc/operations_on_arrays.html#perspectivetransform
if _type is None:
return cv2.perspectiveTransform(cnt_rect, H)
else:
return _type(cv2.perspectiveTransform(cnt_rect, H))
[docs] @staticmethod
def get_cnt_from_bounding_box(bounding_box, _type=np.int32):
return Agent.get_cnt_from_rotated_box(
Agent.get_rotated_box_from_bounding_box(bounding_box), _type)
def __json_enco__(self):
pass
Point = namedtuple('Point', ('x', 'y', 'z'))
[docs]class TailItem(object):
"""TailItem(cnt, rbox, bbox, pt) which behaves like cnt"""
# based from recordtype('TailItem', ['cnt', 'rbox', 'bbox', 'pt'])
# why use __slots__
# https://stackoverflow.com/a/1816648/5288758
_fields = ('cnt', 'rbox', 'bbox', 'pt')
__slots__ = ["_"+i for i in _fields]
def __init__(self, cnt=None, rbox=None, bbox=None, pt=None):
if cnt is None and rbox is None and bbox is None:
raise Exception("must provide at least cnt, rbox or bbox")
self._cnt = cnt
self._rbox = rbox
if bbox is not None:
bbox = tuple(bbox)
self._bbox = bbox
if pt is not None and not isinstance(pt, Point):
pt = Point(*pt) # must be iterable
self._pt = pt
[docs] def point_inside(self, point):
"""
test whether point is inside contour
:param point: point or x-coordinate, y-coordinate
:return: True if inside or the border of contour, else False
"""
try:
return cv2.pointPolygonTest(self.cnt, point.pt[:2], False) != -1
except AttributeError:
return cv2.pointPolygonTest(self.cnt, point[:2], False) != -1
[docs] def cnt_intersect(self, cnt):
"""
test whether internal cnt is intersected with external cnt
:param cnt: external contour
:return: True if contours intersect, else False
"""
try:
return cnt_check_intersection(self.cnt, cnt.cnt) # TailItem object
except AttributeError:
return cnt_check_intersection(self.cnt, cnt)
[docs] def cnt_near(self, cnt, min_dist=None):
try:
(x2, y2), r2 = cnt.enclosing_circle() # TailItem object
except AttributeError:
(x2, y2), r2 = cv2.minEnclosingCircle(cnt)
(x1, y1), r1 = self.enclosing_circle()
if min_dist is None:
min_dist = (r1+r2)/2.
row1 = ((x1-r1, y1), (x1, y1+r1), (x1+r1, y1), (x1, y1-r1))
row2 = ((x2-r2, y2), (x2, y2+r2), (x2+r2, y2), (x2, y2-r2))
min_dt = np.inf
for i in row1:
for j in row2:
dt = euclidean(i, j)
if dt < min_dt:
min_dt = dt
if dt < min_dist:
return True, min_dt
return False, min_dt
[docs] def point_near(self, point, min_dist=50):
try:
pt = point.pt[:2] # TailItem object
except AttributeError:
pt = point[:2]
(x1, y1), r1 = self.enclosing_circle()
if min_dist is None:
min_dist = r1
row1 = ((x1-r1, y1), (x1, y1+r1), (x1+r1, y1), (x1, y1-r1))
min_dt = np.inf
for i in row1:
dt = euclidean(i, pt)
if dt < min_dt:
min_dt = dt
if dt < min_dist:
return True, min_dt
return False, min_dt
[docs] def enclosing_circle(self):
# ((x, y), radius)
return cv2.minEnclosingCircle(self.cnt)
@property
def cnt(self):
if self._cnt is None:
if self._rbox is not None:
self._cnt = Agent.get_cnt_from_rotated_box(self._rbox)
elif self._bbox is not None:
self._cnt = Agent.get_cnt_from_bounding_box(self._bbox)
else:
raise RuntimeError("no enough information")
return self._cnt
@property
def rbox(self):
if self._rbox is None:
if self._cnt is not None:
self._rbox = Agent.get_rotated_box_from_cnt(self._cnt)
elif self._bbox is not None:
self._rbox = Agent.get_rotated_box_from_bounding_box(self._bbox)
else:
raise RuntimeError("no enough information")
return self._rbox
@property
def bbox(self):
if self._bbox is None:
if self._cnt is not None:
self._bbox = Agent.get_bounding_box_from_cnt(self._cnt)
elif self._rbox is not None:
self._bbox = Agent.get_bounding_box_from_rotated_box(self._rbox)
else:
raise RuntimeError("no enough information")
return self._bbox
@property
def pt(self):
if self._pt is None:
M = cv2.moments(self.cnt)
z = M["m00"]
try:
x, y = (int(old_div(M["m10"], M["m00"])),
int(old_div(M["m01"], M["m00"])))
except ZeroDivisionError:
# FIXME zerodivision
print("ZeroDivisionError with cnt", self.cnt)
#x, y = self.cnt[0][0]
raise
self._pt = Point(x, y, z)
return self._pt
# cnt behaviour
def __len__(self):
return len(self.cnt)
def __iter__(self):
for i in self.cnt:
yield i
def __getitem__(self, index):
return self.cnt[index]
def __eq__(self, other):
return (isinstance(other, self.__class__) and self.cnt == other.cnt
and self.rbox == other.rbox and self.bbox == other.bbox
and self.pt == other.pt)
def __ne__(self, other):
return not self == other
def __hash__(self):
return id(self)
# others
def _asdict(self):
return {'cnt': self.cnt, 'rbox': self.rbox, 'bbox': self.bbox,
'pt': self.pt}
def __repr__(self):
return "{cn}(cnt={cnt}, rbox={rbox}, bbox={bbox}, pt={pt})".format(
cn=self.__class__.__name__, **self._asdict())
# persistence
def __getstate__(self):
return (self._cnt, self._rbox, self._bbox, self._pt)
def __setstate__(self, state):
(self._cnt, self._rbox, self._bbox, self._pt) = state