# -*- coding: utf-8 -*-
"""
documentation
"""
from __future__ import division, unicode_literals, print_function
import sys
import warnings
import numpy as np
from ..debugging import DebugPlot
from ..generic.signal import find_phase, find_extrema_and_prominence, spectrum_fourier, spectrum_bins_by_length,\
hires_power_spectrum, vertical_mean, horizontal_mean, normalize, threshold_outliers, find_insides, one_every_n,\
hamming_smooth, each_image_slice
from .cell_detection import Cells
from ..generic.etc import NotReallyATree
from ..generic.tunable import tunable
[docs]class Channel(object):
"""
:param image:
:param left:
:param right:
:param top:
:param bottom:
"""
cells_type = Cells
__slots__ = ['image', 'left', 'right', 'real_top', 'real_bottom', 'putative_orientation', 'cells', 'channel_image']
def __init__(self, image, left, right, top, bottom):
self.image = image
self.left = float(left)
self.right = float(right)
self.real_top = float(top)
self.real_bottom = float(bottom)
self.putative_orientation = 0
self.cells = None
if self.image.image is not None:
self.channel_image = self.crop_out_of_image(self.image.image)
@property
def top(self):
"""
:return:
"""
return self.real_top + self.image.shift[0]
@property
def bottom(self):
"""
:return:
"""
return self.real_bottom + self.image.shift[0]
@property
def centroid(self):
"""
:return:
"""
return (self.left + self.right) / 2, (self.top + self.bottom) / 2
[docs] def crop_out_of_image(self, image):
"""
:param image:
:return:
"""
return image[int(self.real_top):int(self.real_bottom), int(self.left):int(self.right)].copy()
[docs] def get_coordinates(self):
"""
:return:
"""
return [[self.left, self.bottom], [self.right, self.bottom], [self.right, self.top], [self.left, self.top],
[self.left, self.bottom]]
[docs] def detect_cells(self):
"""
Performs Cell detection (by instantiating a Cells object).
"""
self.cells = self.__class__.cells_type(self)
[docs] def clean(self):
"""
Peforms clean up routines.
"""
self.cells.clean()
if not self.image.keep_channel_image:
self.channel_image = None
treeprovider = NotReallyATree
# treeprovider = KDTree
# treeprovider = ckdtree.cKDTree
[docs]class Channels(object):
"""
docstring
"""
__slots__ = ['channels_list', 'image', 'nearest_tree']
channel_type = Channel
def __init__(self, image, bootstrap=True):
self.channels_list = []
self.image = image
self.nearest_tree = None
if not bootstrap:
return
find_channels_function = find_channels
if hasattr(self.image, 'find_channels_function'):
find_channels_function = self.image.find_channels_function
positions, (upper, lower) = find_channels_function(self.image.image)
for begin, end in positions:
self.channels_list.append(self.__class__.channel_type(self.image, begin, end, upper, lower))
def __len__(self):
return len(self.channels_list)
def __iter__(self):
return iter(self.channels_list)
[docs] def clean(self):
"""
Performs clean up routines.
"""
for channel in self:
channel.clean()
@property
def centroids(self):
"""
:return:
"""
return [chan.centroid for chan in self]
[docs] def find_nearest(self, pos):
"""
:param pos:
:return:
"""
if self.nearest_tree is None:
self.nearest_tree = treeprovider(self.centroids)
return self.nearest_tree.query(pos)
[docs] def align_with_and_return_indices(self, other_channels):
"""
:param other_channels:
:return:
"""
if len(other_channels) == 0 or len(self) == 0:
return []
return [[ind, other_channels.find_nearest(cen)[1]] for ind, cen in enumerate(self.centroids)]
[docs]def horizontal_channel_detection(image):
"""
@param image:
@return:
"""
# nothing found is a helper variable, which will be returned in case.
nothing_found = ([], 0, 0, 0, 0, -1, )
profile = horizontal_mean(image)
profile_diff = np.diff(profile)
upper_profile = +(profile_diff * (profile_diff > 0))
lower_profile = -(profile_diff * (profile_diff < 0))
upper_profile[
upper_profile < upper_profile.max() *
tunable(
'channels.horizontal.noise_suppression_range.upper', 0.5,
description="For channel detection, upper profile, noise reduction, reduction range."
)
] *= tunable('channels.horizontal.noise_suppression_factor.upper', 0.1,
description="For channel detection, upper profile, noise reduction, reduction factor.")
lower_profile[
lower_profile < lower_profile.max() *
tunable('channels.horizontal.noise_suppression_range.lower', 0.5,
description="For channel detection, lower profile, noise reduction, reduction range.")
] *= tunable('channels.horizontal.noise_suppression_factor.lower', 0.1,
description="For channel detection, lower profile, noise reduction, reduction factor.")
with DebugPlot('channel_detection', 'details', 'differences') as p:
p.title("Channel detection/Differences")
p.plot(upper_profile)
p.plot(lower_profile)
# oversample the fft n-times
n = tunable('channels.horizontal.fft_oversampling', 8,
description="For channel detection, FFT oversampling factor.")
def calc_bins_freqs_main(the_profile):
"""
:param the_profile:
:return:
"""
frequencies, fourier_value = hires_power_spectrum(the_profile, oversampling=n)
fourier_value = hamming_smooth(fourier_value, tunable('channels.horizontal.fourier_smoothing', 3,
description="For channel detection, smoothing width for the spectrum."))
return frequencies, fourier_value, frequencies[np.argmax(fourier_value)]
# get the power spectra of the two signals
frequencies_upper, fourier_value_upper, mainfrequency_upper = calc_bins_freqs_main(upper_profile)
frequencies_lower, fourier_value_lower, mainfrequency_lower = calc_bins_freqs_main(lower_profile)
upper_profile = hamming_smooth(upper_profile,
tunable('channels.horizontal.profile_smoothing_width.upper', 5,
description="For channel detection, upper profile, smoothing window width."))
lower_profile = hamming_smooth(lower_profile,
tunable('channels.horizontal.profile_smoothing_width.lower', 5,
description="For channel detection, lower profile, smoothing window width."))
with DebugPlot('channel_detection', 'details', 'powerspectra', 'upper') as p:
p.title("Powerspectrum (upper)")
p.semilogx(frequencies_upper, fourier_value_upper)
p.title("main_frequency=%f" % mainfrequency_upper)
with DebugPlot('channel_detection', 'details', 'powerspectra', 'lower') as p:
p.title("Powerspectrum (lower)")
p.semilogx(frequencies_lower, fourier_value_lower)
p.title("main_frequency=%f" % mainfrequency_lower)
main_frequency = (mainfrequency_upper + mainfrequency_lower) / 2
if main_frequency == 0.0:
return nothing_found
maximum_channel_count = profile.size / main_frequency
allowed_maximum_channel_count = tunable(
'channels.horizontal.channel_count.max', 50,
description="For channel detection, maximum allowed channels to be detected.")
allowed_minimum_channel_count = tunable(
'channels.horizontal.channel_count.min', 3,
description="For channel detection, minimum allowed channels to be detected.")
if maximum_channel_count > allowed_maximum_channel_count or maximum_channel_count < allowed_minimum_channel_count:
return nothing_found
profile_diff_len = profile_diff.size
absolute_differentiated_profile = np.absolute(profile_diff)
width, = find_phase(upper_profile, lower_profile)
if width > main_frequency:
width = int(width % main_frequency)
elif width < 0:
width = int(width + main_frequency * np.ceil(abs(width / main_frequency)))
main_frequency += (width / (profile_diff_len / main_frequency))
preliminary_signal = \
one_every_n(profile_diff_len, main_frequency) + one_every_n(profile_diff_len, main_frequency, shift=width)
temporary_signal = np.zeros_like(absolute_differentiated_profile)
temporary_extrema = find_extrema_and_prominence(absolute_differentiated_profile, order=max(1, abs(width // 2)))
temporary_signal[temporary_extrema.maxima] = 1
phase, = find_phase(temporary_signal, preliminary_signal)
new_signal = \
one_every_n(profile_diff_len, main_frequency, shift=phase + 0.5 * width) + \
one_every_n(profile_diff_len, main_frequency, shift=phase + 1.5 * width)
# dependence on 'new_signal' removed, works apparently without
help_signal = normalize(hamming_smooth(absolute_differentiated_profile, 50)) # * new_signal
# under certain conditions, the help signal may contain a totally extreme
# maximum (test image), I guess it's wiser to remove it
help_signal = threshold_outliers(help_signal)
threshold_factor = tunable('channels.horizontal.threshold_factor', 0.2,
description="For channel detection, threshold factor for l/r border determination.")
min_val, max_val = help_signal.min(), help_signal.max()
help_signal = help_signal > (max_val - min_val) * threshold_factor + min_val
remaining_phase_shift, = find_phase(new_signal, absolute_differentiated_profile)
new_signal = \
one_every_n(profile_diff_len, main_frequency, shift=remaining_phase_shift + phase + 0.5 * width) + \
one_every_n(profile_diff_len, main_frequency, shift=remaining_phase_shift + phase + 1.5 * width)
try:
left = np.where(help_signal)[0][0]
except IndexError:
left = 0
try:
# noinspection PyUnresolvedReferences
right = help_signal.size - np.where(help_signal[::-1])[0][0]
except IndexError:
# noinspection PyUnresolvedReferences
right = help_signal.size
left, right = int(left), int(right)
new_signal[:left] = 0
new_signal[right:] = 0
positions, = np.where(new_signal)
if len(positions) % 2 == 1:
# either there's an additional line on the left or on the right
# lets try to find it ...
if abs(abs(positions[-1] - positions[-2]) - width) < 1:
positions = positions[1:]
elif abs(abs(positions[0] - positions[1]) - width) < 1:
positions = positions[:-1]
else:
return nothing_found
positions = positions.reshape((len(positions) // 2, 2))
times = int((right - left) / main_frequency)
# we worked with a simple difference, which made our signal shorter by one
# add 1 to the appropriate variables to match positions with the original image
positions += 1
left, right = left + 1, right + 1
return positions, left, right, width, times, main_frequency,
# TODO fix the proper one, or merge them, or document this here, or use just the new one
[docs]def alternate_vertical_channel_region_detection(image):
"""
:param image:
:return:
"""
f = spectrum_bins_by_length(image.shape[1])
ft_h_s = tunable('channels.vertical.alternate.fft_smoothing_width', 3,
description="For channel detection (alternate, vertical), spectrum smoothing width.")
def horizontal_mean_frequency(img_frag, clean_around=None, clean_width=0.0):
"""
:param img_frag:
:param clean_around:
:param clean_width:
:return:
"""
ft = np.absolute(spectrum_fourier(horizontal_mean(img_frag)))
ft /= 0.5 * ft[0]
ft[0] = 0
ft = hamming_smooth(ft, ft_h_s)
if clean_around:
ft[np.absolute(f - clean_around) > clean_width] = 0.0
return ft.max(), f[np.argmax(ft)]
split_factor = tunable('channels.vertical.alternate.split_factor', 60,
description="For channel detection (alternate, vertical), split factor.")
collector = np.zeros(image.shape[0])
for n, the_step, image_slice in each_image_slice(image, split_factor, direction='horizontal'):
power_local_f, local_f = horizontal_mean_frequency(image_slice)
collector[n*the_step:(n+1)*the_step] = local_f
np.set_printoptions(threshold=sys.maxsize)
# print(collector)
int_collector = collector.astype(np.int32)
if (int_collector == 0).all():
warnings.warn(
"Apparently no channel region was detectable. If the images are flipped, try filename?rotate=<90|270>",
RuntimeWarning
)
return 0, len(int_collector)
bins = np.bincount(int_collector)
winner = np.argmax(bins[1:]) + 1
delta = tunable('channels.vertical.alternate.delta', 5,
description="For channel detection (alternate, vertical), acceptable delta.")
collector = (np.absolute(int_collector - winner) < delta) | (np.absolute(int_collector - 2*winner) < delta)
return sorted(find_insides(collector), key=lambda pair: pair[1] - pair[0], reverse=True)[0]
FIRST_CALL, FROM_TOP, FROM_BOTTOM = 0, -1, 1
[docs]def vertical_channel_region_detection(image):
"""
:param image:
:return:
"""
f = spectrum_bins_by_length(image.shape[1])
ft_h_s = tunable('channels.vertical.recursive.fft_smoothing_width', 3,
description="For channel detection (recursive, vertical), spectrum smoothing width.")
def horizontal_mean_frequency(img_frag, clean_around=None, clean_width=0.0):
"""
:param img_frag:
:param clean_around:
:param clean_width:
:return:
"""
ft = np.absolute(spectrum_fourier(horizontal_mean(img_frag)))
ft /= 0.5 * ft[0]
ft[0] = 0
ft = hamming_smooth(ft, ft_h_s)
if clean_around:
ft[np.absolute(f - clean_around) > clean_width] = 0.0
return ft.max(), f[np.argmax(ft)]
power_overall_f, overall_f = horizontal_mean_frequency(image)
d = tunable('channels.vertical.recursive.maximum_delta', 2.0,
description="For channel detection (recursive, vertical), maximum delta.")
power_min_quotient = tunable('channels.vertical.recursive.power_min_quotient', 0.005,
description="For channel detection (recursive, vertical), minimum power quotient")
break_condition = tunable('channels.vertical.recursive.break_condition', 2.0,
description="For channel detection (recursive, vertical), recursive break condition.")
current_clean_width = overall_f / 2.0
def matches(img_frag):
"""
:param img_frag:
:return:
"""
power_local_f, local_f = horizontal_mean_frequency(
img_frag, clean_around=overall_f, clean_width=current_clean_width)
return (abs(overall_f - local_f) < d) and ((power_local_f / power_overall_f) > power_min_quotient)
height = image.shape[0]
collector = np.zeros(height)
def recursive_check(top, bottom, orientation=FIRST_CALL):
"""
:param top:
:param bottom:
:param orientation:
:return:
"""
if (bottom - top) < break_condition:
return
mid = (top + bottom) // 2
upper = matches(image[top:mid, :])
lower = matches(image[mid:bottom, :])
collector[top:mid] = upper
collector[mid:bottom] = lower
if orientation is FIRST_CALL:
if upper:
recursive_check(top, mid, FROM_TOP)
if lower:
recursive_check(mid, bottom, 1)
elif orientation is FROM_TOP:
if upper and lower:
recursive_check(top, mid, FROM_TOP)
elif not upper and lower:
recursive_check(mid, bottom, FROM_TOP)
elif orientation is FROM_BOTTOM:
if lower and upper:
recursive_check(mid, bottom, FROM_BOTTOM)
elif not lower and upper:
recursive_check(top, mid, FROM_BOTTOM)
recursive_check(0, height)
return sorted(find_insides(collector), key=lambda pair: pair[1] - pair[0], reverse=True)[0]
[docs]def find_channels(image):
"""
channel finder
:param image:
:return:
"""
method_to_use = tunable(
'channels.vertical.method',
'alternate',
description="For channel detection, vertical method to use (either alternate or recursive).")
if method_to_use == 'alternate':
upper, lower = alternate_vertical_channel_region_detection(image)
elif method_to_use == 'recursive':
upper, lower = vertical_channel_region_detection(image)
else:
raise RuntimeError("Tunable set to unsupported vertical channel detection method.")
profile = horizontal_mean(image)
with DebugPlot('channel_detection', 'details', 'overview', 'horizontal') as p:
p.title("Basic horizontal overview")
p.plot(profile)
with DebugPlot('channel_detection', 'details', 'overview', 'vertical') as p:
p.title("Basic vertical overview")
p.plot(vertical_mean(image))
positions, left, right, width, times, mainfreq = horizontal_channel_detection(image[upper:lower, :])
return positions, (upper, lower)