Source code for molyso.generic.signal

# -*- coding: utf-8 -*-
"""
signal processing helper routines
"""

from __future__ import division, unicode_literals, print_function

import warnings

import numpy as np
import scipy.signal
import scipy.interpolate

from collections import namedtuple

from .smoothing import hamming_smooth
from .fft import *


[docs]def find_phase(signal_1=None, signal_2=None, fft_1=None, fft_2=None, return_1=False, return_2=False): """ Finds the phase (time shift) between two signals. Either signalX or fftX should be set; the FFTs can be returned in order to cache them locally... :param signal_1: first input signal :type signal_1: numpy.ndarray or None :param signal_2: second input signal :type signal_2: numpy.ndarray or None :param fft_1: first input fft :type fft_1: numpy.ndarray or None :param fft_2: second input fft :type fft_2: numpy.ndarray or None :param return_1: whether fft1 should be returned :type return_1: bool :param return_2: whether fft2 should be returned :type return_2: bool :return: (shift, (fft1 if return_1), (fft2 if return_2)) :rtype: tuple >>> find_phase(np.array([0, 1, 0, 0, 0]), np.array([0, 0, 0, 1, 0])) (2,) """ if signal_1 is not None and fft_1 is None: fft_1 = fft(signal_1) if signal_2 is not None and fft_2 is None: fft_2 = fft(signal_2) corr = ifft(fft_1 * -fft_2.conjugate()) corr = np.absolute(corr) the_max = np.argmax(corr) # if the_max > 2 and the_max < (len(corr) - 2): # sur = corr[the_max-1:the_max+2] # the_max += -0.5*sur[0] + 0.5*sur[2] the_max = -the_max if the_max < len(fft_1) / 2 else len(fft_1) - the_max result = (the_max,) if return_1: result += (fft_1,) if return_2: result += (fft_2,) return result
[docs]class ExtremeAndProminence(namedtuple('ExtremeAndProminence', ['maxima', 'minima', 'signal', 'order', 'max_spline', 'min_spline', 'xpts', 'max_spline_points', 'min_spline_points', 'prominence'])): """ Result of `find_extrema_and_prominence` call. :var maxima: :var minima: :var signal: :var order: :var max_spline: :var min_spline: :var xpts: :var max_spline_points: :var min_spline_points: :var prominence: """
def _dummy_max_spline(arg): """ :param arg: :return: """ arg = np.zeros_like(arg) arg[:] = float('Inf') return arg def _dummy_min_spline(arg): """ :param arg: :return: """ arg = np.zeros_like(arg) arg[:] = float('-Inf') return arg
[docs]def find_extrema_and_prominence(signal, order=5): """ Generates various extra information / signals. :param signal: input signal :type signal: numpy.ndarray :param order: relative minima/maxima order, see other functions :type order: int :return: an ExtremeAndProminence object with various information members :rtype: ExtremeAndProminence >>> result = find_extrema_and_prominence(np.array([1, 2, 3, 2, 1, 0, 1, 2, 15, 2, -15, 2, 1]), 2) >>> result = result._replace(max_spline=None, min_spline=None) # just for doctests >>> result ExtremeAndProminence(maxima=array([2, 8]), minima=array([ 5, 10]), signal=array([ 1, 2, 3, 2, 1, 0, 1, 2, 15, 2, -15, 2, 1]), order=2, max_spline=None, min_spline=None, xpts=array([ 0., 1., 2., 3., 4., 5., 6., 7., 8., 9., 10., 11., 12.]), max_spline_points=array([ 3. , 2.4875, 3. , 4.3125, 6.2 , 8.4375, 10.8 , 13.0625, 15. , 16.3875, 17. , 16.6125, 15. ]), min_spline_points=array([-9.73055091e-16, 3.38571429e+00, 4.71428571e+00, 4.35000000e+00, 2.65714286e+00, 5.21804822e-15, -3.25714286e+00, -6.75000000e+00, -1.01142857e+01, -1.29857143e+01, -1.50000000e+01, -1.57928571e+01, -1.50000000e+01]), prominence=array([ 3. , -0.89821429, -1.71428571, -0.0375 , 3.54285714, 8.4375 , 14.05714286, 19.8125 , 25.11428571, 29.37321429, 32. , 32.40535714, 30. ])) """ # we are FORCING some kind of result here, although it might be meaningless maxima = np.array([np.argmax(signal)]) iorder = order while iorder > 0: try: maxima = relative_maxima(signal, order=iorder) except ValueError: iorder -= 1 continue break if len(maxima) == 0: maxima = np.array([np.argmax(signal)]) if len(maxima) == 1 and (maxima[0] == 0 or maxima[0] == len(signal) - 1): maxima = [] minima = np.array([np.argmin(signal)]) iorder = order while iorder > 0: try: minima = relative_minima(signal, order=iorder) except ValueError: iorder -= 1 continue break if len(minima) == 0: minima = np.array([np.argmin(signal)]) if len(minima) == 1 and (minima[0] == 0 or minima[0] == len(signal) - 1): minima = [] maximaintpx = np.zeros(len(maxima) + 2) maximaintpy = np.copy(maximaintpx) minimaintpx = np.zeros(len(minima) + 2) minimaintpy = np.copy(minimaintpx) maximaintpx[0] = 0 maximaintpx[1:-1] = maxima[:] maximaintpx[-1] = len(signal) - 1 signal_maxima = signal[maxima] if len(signal_maxima) > 0: maximaintpy[0] = signal_maxima[0] maximaintpy[1:-1] = signal_maxima[:] maximaintpy[-1] = signal_maxima[-1] minimaintpx[0] = 0 minimaintpx[1:-1] = minima[:] minimaintpx[-1] = len(signal) - 1 signal_minima = signal[minima] if len(signal_minima) > 0: minimaintpy[0] = signal_minima[0] minimaintpy[1:-1] = signal_minima[:] minimaintpy[-1] = signal_minima[-1] k = 3 if len(maximaintpy) <= 3: k = len(maximaintpy) - 1 if k < 1: max_spline = _dummy_max_spline else: with warnings.catch_warnings(): warnings.simplefilter('ignore') max_spline = scipy.interpolate.UnivariateSpline(maximaintpx, maximaintpy, bbox=[0, len(signal)], k=k) k = 3 if len(minimaintpy) <= 3: k = len(minimaintpy) - 1 if k < 1: min_spline = _dummy_min_spline else: with warnings.catch_warnings(): warnings.simplefilter('ignore') min_spline = scipy.interpolate.UnivariateSpline(minimaintpx, minimaintpy, bbox=[0, len(signal)], k=k) xpts = np.linspace(0, len(signal) - 1, len(signal)) maxsy = max_spline(xpts) minsy = min_spline(xpts) return ExtremeAndProminence(maxima, minima, signal, order, max_spline, min_spline, xpts, maxsy, minsy, maxsy - minsy)
[docs]def simple_baseline_correction(signal, window_width=None): """ Performs a simple baseline correction by subtracting a strongly smoothed version of the signal from itself. :param signal: input signal :param window_width: smoothing window width :return: >>> simple_baseline_correction(np.array([10, 11, 12, 11, 10])) array([-1. , 0.375 , 1. , -0.375 , -0.96428571]) """ if window_width is None or window_width > len(signal): window_width = len(signal) return signal - hamming_smooth(signal, window_width, no_cache=True)
[docs]def vertical_mean(image): """ Calculates the vertical mean of an image. Note: Image is assumed HORIZONTAL x VERTICAL. :param image: :return: >>> vertical_mean(np.array([[ 1, 2, 3, 4], ... [ 5, 6, 7, 8], ... [ 9, 10, 11, 12], ... [13, 14, 15, 16]])) array([ 2.5, 6.5, 10.5, 14.5]) """ return np.mean(image, axis=1)
[docs]def horizontal_mean(image): """ Calculates the horizontal mean of an image. Note: Image is assumed HORIZONTAL x VERTICAL. :param image: input image :type image: numpy.ndarray :return: >>> horizontal_mean(np.array([[ 1, 2, 3, 4], ... [ 5, 6, 7, 8], ... [ 9, 10, 11, 12], ... [13, 14, 15, 16]])) array([ 7., 8., 9., 10.]) """ return np.mean(image, axis=0)
[docs]def relative_maxima(signal, order=1): """ :param signal: :param order: :return: >>> relative_maxima(np.array([1, 2, 3, 2, 1, 0, 1, 2, 15, 2, -15, 2, 1]), 2) array([2, 8]) """ value, = scipy.signal.argrelmax(signal, order=order) return value
[docs]def relative_minima(signal, order=1): """ :param signal: :param order: :return: >>> relative_minima(np.array([1, 2, 3, 2, 1, 0, 1, 2, 15, 2, -15, 2, 1]), 2) array([ 5, 10]) """ value, = scipy.signal.argrelmin(signal, order=order) return value
[docs]def normalize(data): """ normalizes the values in arr to 0 - 1 :param data: input array :return: normalized array >>> normalize(np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])) array([0. , 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1. ]) >>> normalize(np.array([10, 15, 20])) array([0. , 0.5, 1. ]) """ result = data.astype(float) result -= result.min() result /= result.max() return result
# noinspection PyUnresolvedReferences
[docs]def fit_to_type(image, new_dtype): """ :param image: :param new_dtype: :return: >>> fit_to_type(np.array([-7, 4, 18, 432]), np.uint8) array([ 0, 6, 14, 255], dtype=uint8) >>> fit_to_type(np.array([-7, 4, 18, 432]), np.int8) array([-128, -121, -113, 127], dtype=int8) >>> fit_to_type(np.array([-7, 4, 18, 432]), np.bool) array([False, False, False, True]) >>> fit_to_type(np.array([-7, 4, 18, 432]), np.float32) array([ -7., 4., 18., 432.], dtype=float32) """ scaled_img = normalize(image) to_type = np.dtype(new_dtype) if to_type.kind == 'f': return image.astype(to_type) elif to_type.kind == 'u': return (scaled_img * (2 ** (to_type.itemsize * 8) - 1)).astype(to_type) elif to_type.kind == 'i': return (scaled_img * (2 ** (to_type.itemsize * 8) - 1) - (2 ** (to_type.itemsize * 8 - 1))).astype(to_type) elif to_type.kind == 'b': return scaled_img > 0.5 else: raise TypeError("Unsupported new_dtype value used for rescale_and_fit_to_type used!")
# TODO: Improve this function!
[docs]def threshold_outliers(data, times_std=2.0): """ removes outliers :param data: :param times_std: :return: >>> threshold_outliers(np.array([10, 9, 11, 40, 8, 12, 14, 7]), times_std=1.0) array([10, 9, 11, 20, 8, 12, 14, 7]) """ data = data.copy() median = np.median(data) std = np.std(data) data[(data - median) > (times_std * std)] = median + (times_std * std) data[((data - median) < 0) & (abs(data - median) > times_std * std)] = median - (times_std * std) return data
[docs]def outliers(data, times_std=2.0): """ :param data: :param times_std: :return: >>> outliers(np.array([10, 9, 11, 40, 8, 12, 14, 7]), times_std=1.0) array([False, False, False, True, False, False, False, False]) """ return np.absolute(data - np.median(data)) > (times_std * np.std(data))
[docs]def remove_outliers(data, times_std=2.0): """ :param data: :param times_std: :return: >>> remove_outliers(np.array([10, 9, 11, 40, 8, 12, 14, 7]), times_std=1.0) array([10, 9, 11, 8, 12, 14, 7]) """ try: return data[~outliers(data, times_std)] except TypeError: return data
[docs]def find_insides(signal): """ :param signal: :return: >>> find_insides(np.array([False, False, True, True, True, False, False, True, True, False, False])) array([[2, 5], [7, 9]]) """ # had a nicer numpy using solution ... which failed in some cases ... # plain, but works. pairs = [] last_true = None for n, i in enumerate(signal): if i and not last_true: last_true = n elif not i and last_true: pairs.append([last_true, n]) last_true = None if last_true: pairs.append([last_true, len(signal)-1]) return np.array(pairs)
[docs]def one_every_n(length, n=1, shift=0): """ :param length: :param n: :param shift: :return: >>> one_every_n(10, n=2, shift=0) array([1., 0., 1., 0., 1., 0., 1., 0., 1., 0.]) >>> one_every_n(10, n=2, shift=1) array([0., 1., 0., 1., 0., 1., 0., 1., 0., 1.]) >>> one_every_n(10, n=1, shift=0) array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1.]) """ # major regression here? # don't use np.arange(a,b,c, dtype=np.int32) !!! signal = np.zeros(int(length)) indices = np.around(np.arange(shift % n, length, n)).astype(np.int32) indices = indices[indices < signal.size] signal[indices] = 1 return signal
[docs]def each_image_slice(image, steps, direction='vertical'): """ :param image: :param steps: :param direction: :return: >>> list(each_image_slice(np.ones((4, 4,)), 2, direction='vertical')) [(0, 2, array([[1., 1.], [1., 1.], [1., 1.], [1., 1.]])), (1, 2, array([[1., 1.], [1., 1.], [1., 1.], [1., 1.]]))] >>> list(each_image_slice(np.ones((4, 4,)), 2, direction='horizontal')) [(0, 2, array([[1., 1., 1., 1.], [1., 1., 1., 1.]])), (1, 2, array([[1., 1., 1., 1.], [1., 1., 1., 1.]]))] """ if direction == 'vertical': step = image.shape[1] // steps for n in range(steps): yield n, step, image[:, step * n:step * (n + 1)] elif direction == 'horizontal': step = image.shape[0] // steps for n in range(steps): yield n, step, image[step * n:step * (n + 1), :] else: raise ValueError("Unknown direction passed.")