# -*- coding: utf-8 -*-
"""
This module contains cell tracking infrastructure.
"""
from __future__ import division, unicode_literals, print_function
import numpy as np
[docs]class CellTracker(object):
"""
The CellTracker contains all tracks of a channel.
"""
__slots__ = ['all_tracked_cells', 'origins', 'timepoints']
def __init__(self):
self.all_tracked_cells = {}
self.origins = []
self.timepoints = 0
[docs] def tick(self):
"""
Ticks the clock. Sets the internal timepoint counter forward by one.
"""
self.timepoints += 1
@property
def average_cells(self):
"""
Returns the average count of cells present in this tracked channel.
:return:
"""
if self.timepoints > 0:
return float(len(self.all_tracked_cells)) / self.timepoints
else:
return 0.0
[docs] def new_cell(self):
"""
Creates a new TrackedCell object associated with this tracker.
:return:
"""
return TrackedCell(self)
[docs] def new_observed_cell(self, where):
"""
Creates a new TrackedCell object, with added observation.
:param where:
:return:
"""
return self.new_cell().add_observation(where)
[docs] def new_origin(self):
"""
Creates a new TrackedCell object and adds it as an origin.
:return:
"""
t = self.new_cell()
self.origins.append(t)
return t
[docs] def new_observed_origin(self, where):
"""
Creates a new TrackedCell object and adds it as an origin, with added observation.
:param where:
:return:
"""
return self.new_origin().add_observation(where)
[docs] def is_tracked(self, cell):
"""
Returns whether the cell is tracked.
:param cell:
:return:
"""
return cell in self.all_tracked_cells
[docs] def get_cell_by_observation(self, where):
"""
Returns the associated cell by its observation.
:param where:
:return:
"""
return self.all_tracked_cells[where]
[docs]class TrackedCell(object):
"""
:param tracker:
"""
__slots__ = ['tracker', 'parent', 'children', 'seen_as', 'raw_elongation_rates', 'raw_trajectories']
def __init__(self, tracker):
self.tracker = tracker
self.parent = None
self.children = [] # [None, None]
self.seen_as = []
self.raw_elongation_rates = [0.0]
self.raw_trajectories = [0.0]
@property
def ultimate_parent(self):
"""
:return:
"""
if self.parent is None:
return self
else:
return self.parent.ultimate_parent
@property
def elongation_rates(self):
# if self.parent:
# return self.parent.elongation_rates + self.raw_elongation_rates
# else:
# return self.raw_elongation_rates
"""
:return:
"""
return self.raw_elongation_rates
@property
def trajectories(self):
# if self.parent:
# return self.parent.trajectories + self.raw_trajectories
# else:
# return self.raw_trajectories
"""
:return:
"""
return self.raw_trajectories
[docs] def add_child(self, tcell):
"""
:param tcell:
:return:
"""
tcell.parent = self
self.children.append(tcell)
return self
[docs] def add_children(self, *children):
"""
:param children:
"""
for child in children:
self.add_child(child)
[docs] def add_observation(self, cell):
"""
:param cell:
:return:
"""
self.seen_as.append(cell)
self.tracker.all_tracked_cells[cell] = self
if len(self.seen_as) > 1:
current = self.seen_as[-1]
previous = self.seen_as[-2]
assert (current != previous)
self.raw_elongation_rates.append(
(current.length - previous.length) /
(current.channel.image.timepoint - previous.channel.image.timepoint))
self.raw_trajectories.append(
(current.centroid_1d - previous.centroid_1d) /
(current.channel.image.timepoint - previous.channel.image.timepoint))
return self
[docs]def to_list(x):
"""
:param x:
:return:
"""
return x if type(x) == list else [x]
[docs]class CellCrossingCheckingGlobalDuoOptimizerQueue(object):
"""
"""
def __init__(self):
self.set_a = set()
self.set_b = set()
self.data = []
self.run = []
self.debug_output = ''
[docs] def add_outcome(self, cost, involved_a, involved_b, what):
# nan check
"""
:param cost:
:param involved_a:
:param involved_b:
:param what:
:return:
"""
if cost != cost:
return
self.data.append((cost, (involved_a, involved_b, what)))
self.set_a |= involved_a
self.set_b |= involved_b