Source code for molyso.debugging.debugplot

# -*- coding: utf-8 -*-
"""
documentation
"""
from __future__ import division, unicode_literals, print_function

import os
import atexit

from os.path import isfile
from tempfile import TemporaryFile
from functools import partial

from .callserialization import CallSerialization


[docs]def next_free_filename(prefix, suffix): """ :param prefix: :param suffix: :return: :raise IOError: """ n = 0 while isfile(prefix + '%04d' % (n,) + suffix): n += 1 if n > 9999: raise IOError('No free filename found.') return prefix + '%04d' % (n,) + suffix
[docs]def poly_drawing_helper(p, coordinates, **kwargs): """ :param p: :param coordinates: :param kwargs: """ gca = p.gca() from matplotlib.path import Path from matplotlib.patches import PathPatch actions = [Path.MOVETO] + [Path.LINETO] * (len(coordinates) - 1) if 'closed' in kwargs: if kwargs['closed']: actions.append(Path.CLOSEPOLY) coordinates.append((0, 0)) del kwargs['closed'] gca.add_patch(PathPatch(Path(coordinates, actions), **kwargs))
[docs]def inject_poly_drawing_helper(p): """ :param p: """ p.poly_drawing_helper = partial(poly_drawing_helper, p)
[docs]class DebugPlotInterruptException(Exception): """ Only for internal usage. Used to interrupt plot drawing early if it is disabled. """ pass
[docs]class DebugPlotInterruptThrower(object): """ Dummy object which raises an exception on every call. To be used when debug mode is deactivated. """ def __getattr__(self, item): raise DebugPlotInterruptException()
[docs]class DebugPlot(object): """ The DebugPlot class serves as an switchable abstraction layer to add plotting debug output facilities. """ default_config = { 'figure.figsize': (12, 8), 'figure.dpi': 150, 'image.cmap': 'gray' } file_prefix = 'debug' active = True force_active = False post_figure = 'close' individual_and_merge = False individual_files = False individual_file_prefix = file_prefix file_suffix = '.pdf' throw_on_anything = True pp = None context = '' exp_plot_debugging = False # True diverted_outputs = {} files_to_merge = [] exit_handlers = [] exit_handler_registered = False
[docs] @classmethod def set_context(cls, **kwargs): """ :param kwargs: """ cls.context = ' '.join(["%s=%s" % x for x in sorted(kwargs.items())])
[docs] @classmethod def get_context(cls): """ :return: """ return cls.context
[docs] @classmethod def pdfopener(cls, filename): """ Opens a new PdfPages output, ensuring it will be closed at exit. :param filename: filename :return: """ from matplotlib.backends.backend_pdf import PdfPages try: newoutput = PdfPages(filename) except IOError: basename, pdf = os.path.splitext(filename) filename = next_free_filename(basename, pdf) newoutput = PdfPages(filename) # if it throws now, we won't care def _close_at_exit(): try: newoutput.close() except AttributeError: pass cls.exit_handlers.append(_close_at_exit) return newoutput
@classmethod def _call_exit_handlers(cls): # perform cleanup, either explicitly, # or by atexit at the end (__init__ registers this function) """ """ for handler in cls.exit_handlers: handler() cls.exit_handlers = [] cls.exit_handler_registered = False
[docs] @classmethod def new_pdf_output(cls, filename, collected): """ :param filename: :param collected: """ newoutput = cls.pdfopener(filename) cls.diverted_outputs[newoutput] = collected
def __init__(self, *args, **kwargs): """ Creates an DebugPlot instance :param info: An additional information about the plot, currently shown in the title :return: """ self.info = '' if 'info' in kwargs: self.info = kwargs['info'] # Note: DebugPlot had a sister class 'Debug' which took care of filtering # this was however not used in molyso. In a future rewrite, DebugPlot might be attached # more to the Python included log system / its filter capabilities... # self.filter_okay = Debug.filter(*args) self.filter_str = '.'.join([w.lower() for w in args]) # Debug.filter_to_str(args) self.active = self.force_active # or (self.active and # self.filter_okay and # (Debug.is_enabled('plot') or Debug.is_enabled('plot_pdf'))) if not DebugPlot.exit_handler_registered: atexit.register(DebugPlot._call_exit_handlers) if self.active: from matplotlib import pylab self.pylab = pylab self.call_serialization = CallSerialization() if DebugPlot.individual_and_merge: try: # noinspection PyPackageRequirements,PyUnresolvedReferences import PyPDF2 DebugPlot.individual_files = True except ImportError: DebugPlot.individual_and_merge = False # if Debug.is_enabled('plot_pdf'): if self.active: if not DebugPlot.individual_files: if DebugPlot.pp is None: DebugPlot.pp = self.__class__.pdfopener('debug.pdf') def __enter__(self): # if self.active: # # noinspection PyPep8Naming,PyAttributeOutsideInit if self.active: return self.call_serialization.get_proxy() else: return DebugPlotInterruptThrower() # noinspection PyUnusedLocal def __exit__(self, exc_type, exc_val, exc_tb): if exc_type == DebugPlotInterruptException: return True if self.active: p = self.pylab p.rcParams.update(self.default_config) p.figure() p.text(0.01, 0.01, "%s\n%s\n%s" % (self.info, self.get_context(), self.filter_str), transform=p.gca().transAxes) inject_poly_drawing_helper(p) self.call_serialization.execute(p) if not DebugPlot.individual_files: if DebugPlot.pp: p.savefig(DebugPlot.pp, format='pdf') else: if DebugPlot.individual_and_merge: p.savefig( self.get_file_for_merge(), format='pdf') else: p.savefig( next_free_filename(DebugPlot.individual_file_prefix, DebugPlot.file_suffix), format='pdf') for pp, okay in DebugPlot.diverted_outputs.items(): if self.filter_str in okay: p.savefig(pp, format='pdf') if self.post_figure == 'show': p.show() else: p.close()
[docs] @classmethod def get_file_for_merge(cls): """ :return: """ if len(cls.files_to_merge) == 0: def _merge_exit_handler(): # noinspection PyPackageRequirements,PyUnresolvedReferences import PyPDF2 with open(next_free_filename(cls.file_prefix, cls.file_suffix), 'wb+') as pdf_file: pdf = PyPDF2.PdfFileMerger() for individual_file in cls.files_to_merge: individual_file.seek(0) individual_pdf = PyPDF2.PdfFileReader(individual_file) pdf.append(individual_pdf) pdf.write(pdf_file) cls.files_to_merge = [] cls.exit_handlers.append(_merge_exit_handler) f = TemporaryFile() cls.files_to_merge.append(f) return f