Source code for intelligent_tracker.geometry

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# (C) 2017 David Toro <>

# compatibility with python 2 and 3
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
from builtins import object

# import build-in modules
import sys

# import third party modules
import numpy as np
import cv2
from collections import Counter, OrderedDict
from itertools import chain

# special variables
# __all__ = []
__author__ = "David Toro"
# __copyright__ = "Copyright 2017, The <name> Project"
# __credits__ = [""]
__license__ = "GPL"
# __version__ = "1.0.0"
__maintainer__ = "David Toro"
__email__ = ""
# __status__ = "Pre-release"

[docs]class NotInvertible(Exception): """Exception to determine if an object is not invertible""" pass
[docs]class IncompleteAssociations(Exception): """Exception to raise when a connection could not be determined""" pass
[docs]def norm_point(pt): """normalize point to a tuple (x,y)""" return tuple(pt.reshape(2))
[docs]def bezier(point, line, check=False): """ Apply bezier algorithm to return a value t from 0 to 1 in x and y, that is tx and ty, if point is inside line where t would be the percentage of the distance from point1 to point2. If point is outside line then t<0 or t>1. If line is horizontal then ty is not percentage but the distance from the horizontal and conversely if line is vertical then tx is not percentage but the distance from the vertical. :param point: point :param line: (point1, point2) :param check: True to check tx and ty and return None if point is not between point1 and point2 in the line. :return: tx, ty """ px, py = point ((x1, y1), (x2, y2)) = line dx = (x1-x2) dy = (y1-y2) # check point is inside lines using Bézier parameters # # dx and dy are 0 then line is not a line but a point tx = x1-px if dx != 0: # in x for line 1 tx /= dx if check and not 0 <= tx <= 1: # not (t>=0 and t<=1) return None elif check and px != x1: # if dx = 0 then x1 = x2 return None ty = y1-py if dy != 0: # in y for line 1 ty /= dy if check and not 0 <= ty <= 1: # not (t>=0 and t<=1) return None elif check and py != y1: # if dy = 0 then y1 = y2 return None # if it passed the tests then return True return tx, ty
[docs]def line_intersection(line1, line2, check_inside=True): """ Find the intersecting point between to lines :param line1: (line1_point1, line1_point2) :param line2: (line2_point1, line2_point2) :param check_inside: if True and the lines do not cross between their points then it is not considered an intersection and None is returned :return: point """ # ((x1, y1), (x2, y2)) = line1 ((x3, y3), (x4, y4)) = line2 L2_dx, L1_dx = (x3-x4), (x1-x2) L2_dy, L1_dy = (y3-y4), (y1-y2) det_denominator = L1_dx*L2_dy - L1_dy*L2_dx # When the two lines are parallel or coincident the denominator is zero if det_denominator == 0: return None # get numerators A = (x1*y2-y1*x2) B = (x3*y4-y3*x4) det_1 = A*L2_dx - L1_dx*B det_2 = A*L2_dy - L1_dy*B # get intersection point px = det_1/det_denominator py = det_2/det_denominator if (not check_inside or check_inside and bezier((px, py), line1, check=True) and bezier((px, py), line2, check=True)): # return intersection point return px, py return None
[docs]def cnt_group(cnt, cnt_cmp, check=False): """ compare cnt pertaining points and give the transitions :param cnt: testing cnt :param cnt_cmp: comparing cnt :param check: True to return immediately if a point from cnt is found inside cnt_cmp and with the found flag added to the returned values, True for found or False for not found and consequently with all the flags and transitions plus the found flag. :return: (flags, transitions) where flags are 1 when the points from cnt which are in cnt_cmp, 0 when when in the contour and -1 when they are not inside. The flag is determined by pointPolygonTest. transitions is an list of the indices where a flag changes from outside to inside or in the contour and vice versa. if check is True: (flags, transitions, found) """ assert len(cnt) > 1, "contour must have at least 2 points to test transitions" transitions = [] flags = np.empty(len(cnt), pt_last = norm_point(cnt[-1]) old_flag = cv2.pointPolygonTest(cnt_cmp, pt_last, False) for i, pt in enumerate(cnt): flags[i] = flag = cv2.pointPolygonTest(cnt_cmp, norm_point(pt), False) # border flag=0 is considered inside flag=1 # so there is no transition from flag=0 to flag=1 or vice-versa # but from flag=0 to flag=-1 or flag=1 to flag=-1, or vice-versa #if (flag < 0 and old_flag >= 0) or (old_flag < 0 and flag >= 0): #if old_flag != flag: if (flag == -1 and old_flag != -1) or (old_flag == -1 and flag != -1): # store where is the transition transitions.append(i) if check and flag != -1: # return until flag was found return flags[:i+1], transitions, True # update_func flag old_flag = flag if check: return flags, transitions, False return flags, transitions
[docs]def go_around(index, size, negative=False): """ Correct index to infinitely go around an array. This is equivalent to carrected_index = (index % size) but this function offers more control. :param index: index to correct :param size: size of array :param negative: True to not correct negative indexes :return: (flag, carrected_index) flag indicating that index was corrected """ if not negative or index > size - 1 or index + size < 0: new_index = index % size else: new_index = index if index != new_index: return True, new_index return False, new_index
[docs]class BasePoly(object): """ Base Class to provide basic port allocation, inversion and selection of variables transparently while inverting ports. """ def __init__(self, cnt, flags, port_left, port_right, id_left, id_right, start, stop, key): self.cnt = cnt self.ports = [port_left, port_right] self.port_ids = [self.adequate_id(id_left), self.adequate_id(id_right)] self._start = start self._stop = stop self._inverted = False self._used = False self._group_id = None self.flags = flags # unique identifier to test which points this represents if key is None: key = frozenset(self.port_ids) self.key = key
[docs] def compare_key(self, key): return key == self.key
[docs] def has_all_points_inside(self): """ returns True if all points in the lines are inside the other object """ flags = self.flags if flags is None: return True # no flags thus all points are valid there_is_one = False for i in self.indexes(): f = flags[i] if f == -1: return False if f == 1: there_is_one = True return there_is_one
[docs] @staticmethod def adequate_id(id): """ adequate or normalize id to be used with all Poly objects """ group, key = id try: return group, frozenset(key) except TypeError: return group, frozenset((key,))
@property def id_left(self): return self.port_ids[0] @id_left.setter def id_left(self, value): self.port_ids[0] = value @property def id_right(self): return self.port_ids[1] @id_right.setter def id_right(self, value): self.port_ids[1] = value @property def port_left(self): return self.ports[0] @port_left.setter def port_left(self, value): self.ports[0] = value @property def port_right(self): return self.ports[1] @port_right.setter def port_right(self, value): self.ports[1] = value
[docs] def indexes(self, indices=None, invert=None): """ generate lines' point indexes :param invert: invert generations of points :return: generator """ # control object inversion with user inversion _invert = self._inverted if invert: _invert = not _invert # init starting and stopping point with a step of 1 start, stop, step = self._start, self._stop, 1 remain = ni = len(self) # remaining indices = number of indices or len #rotated = False # flag indicating if index ind goes around array # initialize starting index if _invert: ind = stop else: ind = start # slices support if indices is not None: if isinstance(indices, slice): # for step if indices.step is not None: step = indices.step if step == 0: raise ValueError(": slice step cannot be zero") if step < 0: # if step is negative then invert previous inversion step = -step _invert = not _invert # user starting index nstart = indices.start if nstart is None: nstart = 0 # user stop index nstop = indices.stop if nstop is None or nstop > ni: nstop = ni # calculate remaining indices remain = nstop - nstart # shift starting index if nstart < ni: if _invert: ind = (stop - nstart) % len(self.cnt) #rotated, ind = go_around(stop - nstart, len(self.cnt)) else: ind = (start + nstart) % len(self.cnt) #rotated, ind = go_around(start + nstart, len(self.cnt)) else: remain = 0 # empty slice else: # here indices is really a single index f, _ = go_around(indices, ni, negative=True) if f: raise IndexError(": index {} out of range".format(indices)) # correct negative index if indices < 0: indices = ni + indices # use index if _invert: stop = (stop - indices) % len(self.cnt) ind = start = stop else: start = (start + indices) % len(self.cnt) ind = stop = start # inversion is remembered until yield is finished if _invert: if stop >= start: while remain > 0: # and ind >= start: yield ind ind -= step remain -= step else: l = len(self.cnt) while remain > 0: # and (not rotated and ind <= stop # or rotated and ind >= start): yield ind ind = (ind-step) % l #f, ind = go_around(ind-step, l) #rotated |= f remain -= step else: if stop >= start: while remain > 0: # and ind <= stop: yield ind ind += step remain -= step else: l = len(self.cnt) while remain > 0: # and (not rotated and ind >= start # or rotated and ind <= stop): yield ind ind = (ind+step) % l #f, ind = go_around(ind+step, l) #rotated |= f remain -= step
[docs] def lines_points(self, invert=None): """ generate points from contours :param invert: invert generation :return: generator """ cnt = self.cnt for i in self.indexes(invert=invert): yield cnt[i]
[docs] def process_connections(self, conns, lines): """ Process connections if they are simple from group A to B. :param conns: list of group A :param lines: list of group B :return: consumed Counts """ consumed = getattr(conns, "_used", Counter()) try: # if self is in conns then it must be an Intersection index = conns.index(self) except ValueError: # then self must be a Polyline index = None for _ in (0, 1): for ci, conn in enumerate(conns): if index != ci: # self is not conn: if self.port_right is None: try: i = conn.port_ids.index(self.id_right) except ValueError: pass else: i_expected = 0 if i == i_expected: if conn.port_left is None: self.give_port_right(conn) consumed[ci] += 1 if index is not None: consumed[index] += 1 elif conn.port_right is None: try: conn.invert() self.give_port_right(conn) consumed[ci] += 1 if index is not None: consumed[index] += 1 except NotInvertible: pass if self.port_left is None: try: i = conn.port_ids.index(self.id_left) except ValueError: continue else: i_expected = 1 if i == i_expected: if conn.port_right is None: self.give_port_left(conn) consumed[ci] += 1 if index is not None: consumed[index] += 1 elif conn.port_left is None: try: conn.invert() self.give_port_left(conn) consumed[ci] += 1 if index is not None: consumed[index] += 1 except NotInvertible: pass if self.ports_used(): break else: continue break else: if not self.ports_used(): raise IncompleteAssociations("{} {} did't find an " "adequate Association".format(type(self), self)) conns._used = consumed return consumed
[docs] def test_id(self, id, position): """ test id if is in position left or right of port_ids :param id: id to test :param position: 1 for right, 0 for left :return: True for found id in position """ return position in self.id_in_ids(id)
[docs] def id_in_ids(self, id): """ test whether id is in this Poly object and in which indices :param id: id to test :return: indices where id is in port_ids """ indices = [] cgroup, ckey = id for i, (group, key) in enumerate(self.port_ids): if cgroup == group and any((i in key for i in ckey)): indices.append(i) return indices
[docs] def give_port_right(self, parent): """ safely assign right port to parent :param parent: parent Poly object like an Intersection or Polyline """ assert parent.test_id(self.id_right, 0) or self.test_id(parent.id_left, 1) if self.port_right is None: if parent.port_left is not None: raise IndexError("Port left in guest {} already taken".format(type(parent))) self.port_right = parent parent.port_left = self else: raise IndexError("Port right in host {} already taken".format(type(self)))
[docs] def give_port_left(self, parent): """ safely assign left port to parent :param parent: parent Poly object like an Intersection or Polyline """ assert parent.test_id(self.id_left, 1) or self.test_id(parent.id_right, 0) if self.port_left is None: if parent.port_right is not None: raise IndexError("Port right in guest {} already taken".format(type(parent))) self.port_left = parent parent.port_right = self else: raise IndexError("Port left in host {} already taken".format(type(self)))
[docs] def give_port_in_index(self, index, parent): """ give port in position of index :param index: 1 for right, 0 for left :param parent: parent Poly object """ if index: # port right self.give_port_right(parent) else: # port left self.give_port_left(parent)
[docs] def give_group_id(self, group_id): """ recursively give group_id to all the connected Poly objects """ if self._group_id == group_id: return self._group_id = group_id for i, port in enumerate(self.ports): if port is not None: port.give_group_id(group_id=group_id)
[docs] def to_invert(self, parents=None): """ Though any Poly is invertible it would result in processing penalties if many Poly objects are connected together and they are inverted. Thus this functions return True if all the chain can be easily inverted or False if not. :param parents: previous parent in the chain. Control variable indicating which Poly object was the caller or the first to call to_invert to end chain. :return: True for easy to invert, False if not """ # prevents recursion if parents is None: parents = set((self,)) # for fast membership tests else: if self in parents: # return invertible because the decision # is from parent return True parents.add(self) if len(self) == 1: for port in self.ports: # the choice must be port if port is None: continue if not port.to_invert(parents): return False return True elif all((i is None for i in self.ports)): return True # if all ports are None return True else: # if ports are used and recursively they are used return False
[docs] def invert(self, parents=None, force=False): """ invert all the chain formed from the connections of Poly objects :param parents: previous parent in the chain. Control variable indicating which Poly object was the caller or the first to call to_invert to end chain. :param force: :return: """ # prevents recursion if parents is None: # only run check the first time if not force and not self.to_invert(): raise NotInvertible("Cannot be inverted") parents = [self] else: if self in parents: # return invertible because the decision # is from parent return parents.append(self) for i, port in enumerate(self.ports): if port is not None: port.invert(parents) # for port self.ports.reverse() self.port_ids.reverse() self.apply_on_invert(parents=parents) self._inverted = not self._inverted
[docs] def apply_on_invert(self, parents=None): return
[docs] def recurse_right(self, parent=None): """ recursively generate points until a round trip is completed :param parent: previous parent in the chain. Control variable indicating which Poly object was the caller or the first to call to_invert to end chain. :return: generator """ if self._used: return # PEP 479 raise StopIteration if parent is None: parent = self elif parent is self: return # PEP 479 raise StopIteration # yield points for i in self.lines_points(): yield i self._used = True port = self.port_right if port.port_right is self: for i in port.recurse_left(parent=parent): yield i elif port.port_left is self: for i in port.recurse_right(parent=parent): yield i else: raise Exception("Bad connection in {}-{}".format(self, port))
[docs] def recurse_left(self, parent=None): """ recursively generate points until a round trip is completed :param parent: previous parent in the chain. Control variable indicating which Poly object was the caller or the first to call to_invert to end chain. :return: generator """ if self._used: return # PEP 479 raise StopIteration if parent is None: parent = self elif parent is self: return # PEP 479 raise StopIteration # yield points for i in self.lines_points(invert=True): yield i self._used = True port = self.port_left if port.port_left is self: for i in port.recurse_right(parent=parent): yield i elif port.port_right is self: for i in port.recurse_left(parent=parent): yield i else: raise Exception("Bad connection in {}-{}".format(self, port))
[docs] def ports_used(self): """ returns True if left and right ports are assigned """ #return not(self.port_left is None or self.port_right is None) return all((i is not None for i in self.ports))
def __str__(self): l1, l2 = self.id_left r1, r2 = self.id_right return "<{},{}>{}<{},{}>".format(l1, l2, list(self), r1, r2) __iter__ = lines_points def __getitem__(self, indices): if isinstance(indices, slice): return [self.cnt[i] for i in self.indexes(indices=indices)] else: return self.cnt[next(self.indexes(indices=indices))] def __len__(self): """ len of the generated indexes """ if self._stop >= self._start: return self._stop - self._start + 1 else: return len(self.cnt) - self._start + self._stop + 1 __bool__ = ports_used __nonzero__ = __bool__ # compatibility with python 2
[docs]class Interception(BasePoly): """ Represents a Interception """ def __init__(self, id_left, center, id_right, key): super(Interception, self).__init__([center], None, None, None, id_left, id_right, 0, 0, key)
[docs]class PolyLine(BasePoly): """ Represents a Polyline """ def __init__(self, cnt, flags, cnt_id, start, stop): super(PolyLine, self).__init__(cnt, flags, None, None, (cnt_id, start), (cnt_id, stop), start, stop, None)
[docs]class Completeness(object): """ Search space to add BasePoly objects and find associations """ def __init__(self): self._references = {}
[docs] def add_id(self, id, item): if id is None: raise Exception("None is not id") try: ref = self._references[id] except KeyError: self._references[id] = ref = [0, set()] ref[0] += 1 ref[1].add(item)
[docs] def sub_id(self, id, item): ref = self._references[id] ref[0] += 1 ref[1].remove(item)
[docs] def register(self, item): """ register Connection ids :param item: :return: """ if (len(item) == 1 and item.id_left == item.id_right and (item.port_left is None or item.port_right is None)): self.add_id(item.id_left, item) else: if item.port_left is None: self.add_id(item.id_left, item) if item.port_right is None: self.add_id(item.id_right, item)
[docs] def unregister(self, item): """ unregister all ids from a Connection :param item: :return: """ if item.port_left is not None: self.sub_id(item.id_left, item) if item.port_right is not None: self.sub_id(item.id_right, item)
[docs] def items_counts(self): """ iterate over (id, count) """ for id, (count, ref) in self._references.items(): yield id, count
[docs] def items_connections(self): """ iterate over (id, references to connections) """ for id, (count, ref) in self._references.items(): yield id, ref
[docs] def generate_count_dictionary(self): """ get ordered dictionary of count of associations """ # get unique counts to create sorted count dictionary counts = np.sort(np.unique([i for i, j in self._references.values()])) # allocate ordered keys count_dict = OrderedDict([(c,[]) for c in counts]) # populate keys for k, (count, ref) in self._references.items(): count_dict[count].append((k, ref)) return count_dict
[docs] def generate_incomplete_set(self): """ create a set with all missing ids """ incomplete = set() for link, link_sum in self.items_counts(): if link_sum % 2: id, indexes = link try: # if there are several indexes for index in indexes: incomplete.add((id, index)) except TypeError: # if there is just one index incomplete.add(link) return incomplete
[docs] def create_associations(self): """ associate connections in all references """ count_groups = self.generate_count_dictionary() for count, groups in count_groups.items(): if count == 1: # special case: single references (int) # with general references (set) groups_ints = [] groups_sets = [] for id, refs in groups: try: if len(id[1]) == 1: groups_ints.append((id, list(refs)[0])) else: groups_sets.append((id, list(refs)[0])) except TypeError: groups_ints.append((id, list(refs)[0])) ints_in_sets = [] for id, ref in groups_ints: pos = [] ccnt_id, ckey = id for i, ((cnt_id, key), _) in enumerate(groups_sets): if ccnt_id == cnt_id and any((i in key for i in ckey)): pos.append(i) ints_in_sets.append((pos, id, ref)) ints_in_sets.sort(key=lambda x: len(x[0])) groups2 = [] for pos, id, ref in ints_in_sets: assert len(pos) < 3 while len(pos) > 0: r = pos[0] for c, _, _ in ints_in_sets: try: c.remove(r) except ValueError: pass groups2.append((id, (ref, groups_sets[r][1]))) for id, refs in groups2: i = [(r.id_in_ids(id), r) for r in refs] # sort by least connections and first Polyline i.sort(key=lambda x: (len(x[0]), not isinstance(x[1], PolyLine))) self.associate(i) elif count == 2: # special case: complete references for id, refs in groups: i = [(r.id_in_ids(id), r) for r in refs] # sort by least connections and first Polyline i.sort(key=lambda x: (len(x[0]), not isinstance(x[1], PolyLine))) self.associate(i) elif count == 3: for id, refs in groups: # in each group there is a connection # which is a Polyline with one point i = [(r.id_in_ids(id), r) for r in refs] # sort by least connections and first Polyline i.sort(key=lambda x: (len(x[0]), not isinstance(x[1], PolyLine))) indeces_l, c_l = i.pop() if len(indeces_l) == 2: # line with 2 equal ids self.associate(i[0], (indeces_l, c_l)) self.associate(i[1], (indeces_l, c_l)) else: # confused associations: these should be left for last # and only be made if counts have been reduced pill = 0 # groups[0][1] for id, refs in groups: for r in refs: if r._group_id is None: r.give_group_id(pill) # join the ones with the same pill i = [(r.id_in_ids(id), r) for r in refs if r._group_id == pill] if len(i) == 2: # there is a group to close self.associate(i) pill += 1
[docs] def associate(self, *args): if len(args) == 1: (indeces0, c0), (indeces1, c1) = args[0] elif len(args) == 2: (indeces0, c0), (indeces1, c1) = args[0], args[1] else: indeces0, c0, indeces1, c1 = args index0 = indeces0[0] p0, p1 = c0.ports, c1.ports if indeces0 == indeces1: # both have ids at the same side # invert try: c1.invert() c0.give_port_in_index(index0, c1) except NotInvertible: c0.invert(force=True) c0.give_port_in_index(not index0, c1) elif len(indeces1) == 2: # second connection is a Polyline with one point if p1[not index0] is None: # no need to invert c0.give_port_in_index(index0, c1) else: # invert try: c1.invert() c0.give_port_in_index(index0, c1) except NotInvertible: c0.invert(force=True) c0.give_port_in_index(not index0, c1) else: # no need to invert c0.give_port_in_index(index0, c1)
[docs]def cnt_check_intersection(cnt0, cnt1): """ check if normal cnt0 and cnt1 intersect """ # -1 = outside, 0 = boundary, 1 = inside # fastplt(draw_contour_groups([[cnt0],[cnt1]]), interpolation="nearest") # transitions give the id0_i0 i of the transition; line: i, i-1 flags0, trans0_raw, found = cnt_group(cnt0, cnt1, True) if found: # fastest route: when a point in contour is inside another return found flags1, trans1_raw, found = cnt_group(cnt1, cnt0, True) if found: # fastest route: when a point in contour is inside another return found # pair variables to choose with id trans_raw = trans0_raw, trans1_raw cnts = cnt0, cnt1 flags = flags0, flags1 # select id as first contour that must have transitions id = 0 if trans1_raw: id = 1 # selected variables with id id0, id1 = int(id), int(not id) id0_c, id1_c = cnts[id0], cnts[id1] id0_f, id1_f = flags[id0], flags[id1] # check if np.alltrue(id0_f == -1): # all_outside0: id0_trans_raw = np.arange(len(id0_f)) else: id0_trans_raw = trans_raw[id0] if np.alltrue(id1_f == -1): # all_outside1: id1_trans_raw = np.arange(len(id1_f)) else: id1_trans_raw = trans_raw[id1] # test connections for i0, tr0 in enumerate(id0_trans_raw): # get indexes in id0 id0_i0, id0_i1 = tr0, ((tr0 - 1) % len(id0_f)) for i1, tr1 in enumerate(id1_trans_raw): # get indexes in id1 id1_i0, id1_i1 = tr1, ((tr1 - 1) % len(id1_f)) # create line 0 in intersection l0a, l0b = id0_c[id0_i0], id0_c[id0_i1] l0 = norm_point(l0a), norm_point(l0b) # create line 1 in intersection l1a, l1b = id1_c[id1_i0], id1_c[id1_i1] l1 = norm_point(l1a), norm_point(l1b) # get intersection if line_intersection(l0, l1, check_inside=True) is not None: return True
[docs]def cnt_intersection(cnt0, cnt1): """ intersect cnt0 with cnt1 """ # -1 = outside, 0 = boundary, 1 = inside # fastplt(draw_contour_groups([[cnt0],[cnt1]]), interpolation="nearest") # transitions give the id0_i0 i of the transition; line: i, i-1 flags0, trans0_raw = cnt_group(cnt0, cnt1) flags1, trans1_raw = cnt_group(cnt1, cnt0) ## fastest route: when a contour is inside another if np.alltrue(flags0 != -1): return [cnt0] elif np.alltrue(flags1 != -1): return [cnt1] # default point shape try: default_shape = cnt0[0].shape except (AttributeError, IndexError): try: default_shape = cnt1[0].shape except (AttributeError, IndexError): default_shape = None ## special case of 2 lines #if all_outside0 and all_outside1: # here it was optimized instead of generalizing the # subsequent code to prevent overheating in other cases #pass # initialize transitions, lines and connections class EspList(list): """Special list to save statistics""" trans0 = [] trans1 = [] lines = EspList([]) conns = EspList([]) # pair variables to choose with id trans_raw = trans0_raw, trans1_raw trans = trans0, trans1 cnts = cnt0, cnt1 flags = flags0, flags1 used_flags = np.zeros_like(flags0,, np.zeros_like(flags1, # unique key format # frozenset(((id0,frozenset((id0_i0, id0_i1))), # (id1,frozenset((id1_i0, id1_i1))))) used_combinations = set() # # select id as first contour that must have transitions if trans1_raw: id = 1 else: id = 0 # selected variables with id id0, id1 = int(id), int(not id) id0_c, id1_c = cnts[id0], cnts[id1] id0_t, id1_t = trans[id0], trans[id1] id0_f, id1_f = flags[id0], flags[id1] #id0_u, id1_u = used_flags[id0], used_flags[id1] # check all_outside0 = np.alltrue(id0_f == -1) all_outside1 = np.alltrue(id1_f == -1) if all_outside0: id0_trans_raw = np.arange(len(id0_f)) else: id0_trans_raw = trans_raw[id0] if all_outside1: id1_trans_raw = np.arange(len(id1_f)) else: id1_trans_raw = trans_raw[id1] ## find connections and correct raw transitions for i0, tr0 in enumerate(id0_trans_raw): # get indexes in id0 id0_i0, id0_i1 = tr0, ((tr0 - 1) % len(id0_f)) # convert trans0_raw to trans0 # move transitions where -1 to the nearest non -1 if all_outside0: tr0_correct = i0 else: if id0_f[tr0] != -1: tr0_correct = tr0 elif id0_f[tr0-1] != -1: tr0_correct = id0_i1 # append processed transition id0_t.append(tr0_correct) for i1, tr1 in enumerate(id1_trans_raw): # get indexes in id1 id1_i0, id1_i1 = tr1, ((tr1 - 1) % len(id1_f)) # convert trans1_raw to trans1 # move transitions where -1 to the nearest non -1 if all_outside1: tr1_correct = i1 else: if id1_f[tr1] != -1: tr1_correct = tr1 elif id1_f[tr1-1] != -1: tr1_correct = id1_i1 # only process once if i0 == 0: id1_t.append(tr1_correct) # create unique key A = (id0, frozenset((id0_i0, id0_i1))) B = (id1, frozenset((id1_i0, id1_i1))) key = frozenset((A, B)) # do not repeat connections if key not in used_combinations: # create line 0 in intersection l0a, l0b = id0_c[id0_i0], id0_c[id0_i1] l0 = norm_point(l0a), norm_point(l0b) # create line 1 in intersection l1a, l1b = id1_c[id1_i0], id1_c[id1_i1] l1 = norm_point(l1a), norm_point(l1b) # get intersection inter = line_intersection(l0, l1, check_inside=True) if inter is not None: # convert intersection to the same as cnt0 or cnt1 if default_shape is not None: inter = np.array(inter).reshape(default_shape) # if inter is already one of the lines boundary # then set it to None to not repeat it. if any(np.array_equiv(inter, x) for x in [l0a, l0b, l1a, l1b]): inter = None # append new Interception conns.append(Interception((id0, tr0_correct), inter, (id1, tr1_correct), key)) #conns.append(Interception(A, inter, B, key)) #id0_u[id0_i0] += 1 #id0_u[id0_i1] += 1 #id1_u[id1_i0] += 1 #id1_u[id1_i1] += 1 # add the used combination whether it had a intersection or not used_combinations.add(key) if trans0 or trans1: ## find lines for id, t, c, f in ((0, trans0, cnt0, flags0), (1, trans1, cnt1, flags1)): # process Lines in trans0 used_lines = np.zeros_like(t) # flags to not repeat lines for i in range(-1, len(t) - 1): # cnt, flags, id, left, right l = PolyLine(c, f, id, t[i], t[i + 1]) if (l.has_all_points_inside() and not (used_lines[i] and used_lines[i + 1])): used_lines[i] = 1 used_lines[i + 1] = 1 lines.append(l) # sort from bigger to smaller to increase the chance of finding connections # sort the lines so that single points i.e a line with 1 point # are left for last... (this could potentially be risky) #lines.sort(key=lambda x: len(x), reverse=True) elif len(conns) == 1: #np.alltrue(used_flags): # there are no lines and only all connections remain # this represents intersection with pure lines where # the lines are not inside any area return [list(c) for c in conns] ## fastest solving for regular shapes # solve associations between lines and connections shape_complex = True if not shape_complex: for l in lines: try: l.process_connections(conns, lines) except IncompleteAssociations: shape_complex = True # solve connections between connections if there are # still unsolved associations if not shape_complex: remaining_conns = [c for c in conns if not c.ports_used()] for c in remaining_conns: try: c.process_connections(conns, lines) except IncompleteAssociations: shape_complex = True ## if there are lines and connections then find associations # if there are no lines then the program should not arrive here if shape_complex: # find if there are odd connections completeness = Completeness() for i in chain(*(lines, conns)): # lines + conns completeness.register(i) to_find_last = None limit = 1000 for no_iterations in range(limit): # if there are connections not included find them # before we look for all the associations to_find = completeness.generate_incomplete_set() if not to_find or to_find_last == to_find: # end search if all possible associations found break # update_func last list of indexes to_find_last = to_find for (id, id0_i0) in to_find: # select variables with id id0, id1 = int(id), int(not id) id0_c, id1_c = cnts[id0], cnts[id1] for id0_i1 in (((id0_i0-1) % len(id0_c)), ((id0_i0+1) % len(id0_c))): # create line 0 l0a, l0b = id0_c[id0_i0], id0_c[id0_i1] l0 = norm_point(l0a), norm_point(l0b) for id1_i0 in range(len(id1_c)): # create line 1 id1_i1 = ((id1_i0 - 1) % len(id1_c)) l1a, l1b = id1_c[id1_i0], id1_c[id1_i1] l1 = norm_point(l1a), norm_point(l1b) # create unique key A = (id0, frozenset((id0_i0, id0_i1))) B = (id1, frozenset((id1_i0, id1_i1))) key = frozenset((A, B)) if key not in used_combinations: # find intersection inter = line_intersection(l0, l1, check_inside=True) if inter is not None: # convert intersection to the same as cnt0 or cnt1 if default_shape is not None: inter = np.array(inter).reshape(default_shape) # if inter is already one of the lines boundary # then set it to None to not repeat it. if any(np.array_equiv(inter, x) for x in [l0a, l0b, l1a, l1b]): inter = None # append new Interception and # prevent future inversions by creating them # in the correct order if id: inter = Interception(A, inter, B, key) else: inter = Interception(B, inter, A, key) conns.append(inter) # register ids completeness.register(inter) # add the used combination whether # it had a intersection or not used_combinations.add(key) else: raise Exception("limit of {} iterations exceeded and there are" "still not complete associations".format(limit)) completeness.create_associations() tries = 1 try: # debug and prevent incomplete associations to # produce erroneous contours for j in range(tries+1): completeness = Completeness() for i in chain(*(lines, conns)): # lines + conns completeness.register(i) if not completeness._references: break elif j >= tries: raise IncompleteAssociations("Could not solve conns and lines") completeness.create_associations() except IncompleteAssociations: # this is for debugging from intelligent_tracker.figures import interactive_points from intelligent_tracker.array_utils import draw_contour_groups interactive_points(draw_contour_groups([[cnt0], [cnt1]]), lines+conns) completeness.create_associations() raise contours = [] # process all intersected cnts for l in chain(*(lines, conns)): cnt = list(l.recurse_right()) if cnt: contours.append(cnt) # check algorithm #for i in lines: i._used = False #for i in conns: i._used = False #cnt = list(conns[0].recurse_right()) #for i in lines: i._used = False #for i in conns: i._used = False #cnt_ = list(conns[0].recurse_left()) #assert check_contours([cnt], [cnt_]), "forward and reverse must yield the same indexes" #fastplt(draw_contour_groups([[cnt0], [cnt1]]), interpolation="nearest") #fastplt(draw_contour_groups([[np.array(c,] for c in contours]), interpolation="nearest") return contours
[docs]def intersect_analytical(contours): """ Intersect contours by applying purely analytic operations. Contrary to the pixel approach this should not consume much memory and it can yield more precise intersections without adding many points but it could take more time for small contours. If resolution is really big it can save memory because it does not produce accordingly big binary images to obtain the intersections. :param contours: :return: overlapped contours """ # i thing this will be the slowest and a difficult one to implement #assert len(contours) > 1 contours = contours[:] # copy #fastplt(draw_contour_groups([[c] for c in contours]), interpolation="nearest") new_contours = [contours.pop(0)] # evaluate all contours while contours: to_intersect = contours.pop() # fastplt(draw_contour_groups([[to_intersect],[new_cnt]]), interpolation="nearest") new_contours_cache = [] for new_cnt in new_contours: ans = cnt_intersection(new_cnt, to_intersect) if ans: new_contours_cache.extend([np.array(c, for c in ans]) new_contours = new_contours_cache # fastplt(draw_contour_groups([[np.array(c,] for c in new_contours]), interpolation="nearest") if not new_contours: break return new_contours
[docs]def mixed_intersections(contours, method, img): pass
[docs]def draw_drawContours(img, cnt): """drawing function used to draw cnt""" return cv2.drawContours(img, [cnt], -1, 1, -1)
[docs]def draw_fillConvexPoly(img, cnt): """ Draw contour. It cannot draw all the cnt correctly. For it to be correct it must be convex. :param img: :param cnt: :return: """ return cv2.fillConvexPoly(img, cnt, 1, cv2.LINE_4)
[docs]def draw_fillPoly(img, cnt): """drawing function used to draw cnt""" return cv2.fillPoly(img, [cnt], 1, cv2.LINE_4)
[docs]def intersect_AND(img, contours, function=draw_drawContours): """ Intersect contours by applying AND operations :param img: initial binary image :param contours: contours to overlap :param function: drawing function :return: final binary image, overlapped contours """ # create an image filled with zeros, single-channel, same size as img. blank = np.zeros(img.shape[0:2]) # evaluate all contours for cnt in contours: bn = function(blank.copy(), cnt) # now AND the two together img = np.logical_and(img, bn) return cv2.findContours(img.astype(np.uint8), mode=cv2.RETR_LIST, method=cv2.CHAIN_APPROX_SIMPLE)[:2]
[docs]def intersect_ADD(img, contours, function=draw_drawContours): """ Intersect contours by applying ADDING operations and finally thresholding :param img: initial binary image :param contours: contours to overlap :param function: drawing function :return: final binary image, overlapped contours """ # create an image filled with zeros, single-channel, same size as img. blank = np.zeros(img.shape[0:2]) num_cnt = len(contours) # evaluate all contours for cnt in contours: bn = function(blank.copy(), cnt) # add images together img += bn # pick all points that sum up to the total additions return cv2.findContours((img == (num_cnt+1)).astype(np.uint8), mode=cv2.RETR_LIST, method=cv2.CHAIN_APPROX_SIMPLE)[:2]