Source code for holoviews.selection

from collections import namedtuple

import numpy as np
import param
from param.parameterized import bothmethod

from .core.data import Dataset
from .core.element import Element, Layout
from .core.layout import AdjointLayout
from .core.options import CallbackError, Store
from .core.overlay import NdOverlay, Overlay
from .core.spaces import GridSpace
from .streams import (
    CrossFilterSet,
    Derived,
    Pipe,
    PlotReset,
    SelectionExprSequence,
    SelectMode,
    Stream,
)
from .util import DynamicMap


class _Cmap(Stream):
    cmap = param.Parameter(default=None, allow_None=True, constant=True)


class _SelectionExprOverride(Stream):
    selection_expr = param.Parameter(default=None, constant=True, doc="""
            dim expression of the current selection override""")


class _SelectionExprLayers(Derived):
    exprs = param.List(constant=True)

    def __init__(self, expr_override, cross_filter_set, **params):
        super().__init__(
            [expr_override, cross_filter_set], exclusive=True, **params
        )

    @classmethod
    def transform_function(cls, stream_values, constants):
        override_expr_values = stream_values[0]
        cross_filter_set_values = stream_values[1]

        if override_expr_values.get('selection_expr', None) is not None:
            return {"exprs": [True, override_expr_values["selection_expr"]]}
        else:
            return {"exprs": [True, cross_filter_set_values["selection_expr"]]}


_Styles = Stream.define('Styles', colors=[], alpha=1.)
_RegionElement = Stream.define("RegionElement", region_element=None)


_SelectionStreams = namedtuple(
    'SelectionStreams',
    'style_stream exprs_stream cmap_streams '
)

class _base_link_selections(param.ParameterizedFunction):
    """
    Baseclass for linked selection functions.

    Subclasses override the _build_selection_streams class method to construct
    a _SelectionStreams namedtuple instance that includes the required streams
    for implementing linked selections.

    Subclasses also override the _expr_stream_updated method. This allows
    subclasses to control whether new selections override prior selections or
    whether they are combined with prior selections
    """

    link_inputs = param.Boolean(default=False, doc="""
        Whether to link any streams on the input to the output.""")

    show_regions = param.Boolean(default=True, doc="""
        Whether to highlight the selected regions.""")

    @bothmethod
    def instance(self_or_cls, **params):
        inst = super().instance(**params)

        # Init private properties
        inst._cross_filter_stream = CrossFilterSet(mode=inst.cross_filter_mode)
        inst._selection_override = _SelectionExprOverride()
        inst._selection_expr_streams = {}
        inst._plot_reset_streams = {}

        # Init selection streams
        inst._selection_streams = self_or_cls._build_selection_streams(inst)

        return inst

    def _update_mode(self, event):
        if event.new == 'replace':
            self.selection_mode = 'overwrite'
        elif event.new == 'append':
            self.selection_mode = 'union'
        elif event.new == 'intersect':
            self.selection_mode = 'intersect'
        elif event.new == 'subtract':
            self.selection_mode = 'inverse'

    def _register(self, hvobj):
        """
        Register an Element or DynamicMap that may be capable of generating
        selection expressions in response to user interaction events
        """
        from .element import Table

        # Create stream that produces element that displays region of selection
        selection_expr_seq = SelectionExprSequence(
            hvobj, mode=self.selection_mode,
            include_region=self.show_regions,
            index_cols=self.index_cols
        )
        self._selection_expr_streams[hvobj] = selection_expr_seq
        self._cross_filter_stream.append_input_stream(selection_expr_seq)

        self._plot_reset_streams[hvobj] = PlotReset(source=hvobj)

        # Register reset
        def clear_stream_history(resetting, stream=selection_expr_seq.history_stream):
            if resetting:
                stream.clear_history()
                stream.event()

        if not isinstance(hvobj, Table):
            mode_stream = SelectMode(source=hvobj)
            mode_stream.param.watch(self._update_mode, 'mode')

        self._plot_reset_streams[hvobj].param.watch(
            clear_stream_history, ['resetting']
        )

    def __call__(self, hvobj, **kwargs):
        # Apply kwargs as params
        self.param.update(**kwargs)

        if Store.current_backend not in Store.renderers:
            raise RuntimeError("Cannot perform link_selections operation "
                               "since the selected backend %r is not "
                               "loaded. Load the plotting extension with "
                               "hv.extension or import the plotting "
                               "backend explicitly." % Store.current_backend)

        # Perform transform
        return self._selection_transform(hvobj.clone())

    def _selection_transform(self, hvobj, operations=()):
        """
        Transform an input HoloViews object into a dynamic object with linked
        selections enabled.
        """
        from .plotting.util import initialize_dynamic
        if isinstance(hvobj, DynamicMap):
            callback = hvobj.callback
            if len(callback.inputs) > 1:
                return Overlay([
                    self._selection_transform(el) for el in callback.inputs
                ]).collate()

            initialize_dynamic(hvobj)
            if issubclass(hvobj.type, Element):
                self._register(hvobj)
                chart = Store.registry[Store.current_backend][hvobj.type]
                return chart.selection_display(hvobj).build_selection(
                    self._selection_streams, hvobj, operations,
                    self._selection_expr_streams.get(hvobj, None), cache=self._cache
                )
            elif (issubclass(hvobj.type, Overlay) and
                  getattr(hvobj.callback, "name", None) == "dynamic_mul"):
                return Overlay([
                    self._selection_transform(el, operations=operations)
                    for el in callback.inputs
                ]).collate()
            elif getattr(hvobj.callback, "name", None) == "dynamic_operation":
                obj = callback.inputs[0]
                return self._selection_transform(obj, operations=operations).apply(
                    callback.operation)
            else:
                # This is a DynamicMap that we don't know how to recurse into.
                self.param.warning(
                    "linked selection: Encountered DynamicMap that we don't know "
                    f"how to recurse into:\n{hvobj!r}"
                )
                return hvobj
        elif isinstance(hvobj, Element):
            # Register hvobj to receive selection expression callbacks
            chart = Store.registry[Store.current_backend][type(hvobj)]
            if getattr(chart, 'selection_display', None) is not None:
                element = hvobj.clone(link=self.link_inputs)
                self._register(element)
                return chart.selection_display(element).build_selection(
                    self._selection_streams, element, operations,
                    self._selection_expr_streams.get(element, None), cache=self._cache
                )
            return hvobj
        elif isinstance(hvobj, (Layout, Overlay, NdOverlay, GridSpace, AdjointLayout)):
            data = dict([(k, self._selection_transform(v, operations))
                                 for k, v in hvobj.items()])
            if isinstance(hvobj, NdOverlay):
                def compose(*args, **kwargs):
                    new = []
                    for k, v in data.items():
                        for i, el in enumerate(v[()]):
                            if i == len(new):
                                new.append([])
                            new[i].append((k, el))
                    return Overlay([hvobj.clone(n) for n in new])
                new_hvobj = DynamicMap(compose)
                new_hvobj.callback.inputs[:] = list(data.values())
            else:
                new_hvobj = hvobj.clone(data)
                if hasattr(new_hvobj, 'collate'):
                    new_hvobj = new_hvobj.collate()
            return new_hvobj
        else:
             # Unsupported object
            return hvobj

    @classmethod
    def _build_selection_streams(cls, inst):
        """
        Subclasses should override this method to return a _SelectionStreams
        instance
        """
        raise NotImplementedError()

    def _expr_stream_updated(self, hvobj, selection_expr, bbox, region_element, **kwargs):
        """
        Called when one of the registered HoloViews objects produces a new
        selection expression.  Subclasses should override this method, and
        they should use the input expression to update the `exprs_stream`
        property of the _SelectionStreams instance that was produced by
        the _build_selection_streams.

        Subclasses have the flexibility to control whether the new selection
        express overrides previous selections, or whether it is combined with
        previous selections.
        """
        raise NotImplementedError()





[docs]class SelectionDisplay: """ Base class for selection display classes. Selection display classes are responsible for transforming an element (or DynamicMap that produces an element) into a HoloViews object that represents the current selection state. """ def __call__(self, element): return self def build_selection(self, selection_streams, hvobj, operations, region_stream=None, cache=None): if cache is None: cache = {} raise NotImplementedError() @staticmethod def _select(element, selection_expr, cache=None): if cache is None: cache = {} from .element import Curve, Spread from .util.transform import dim if isinstance(selection_expr, dim): dataset = element.dataset mask = None if dataset._plot_id in cache: ds_cache = cache[dataset._plot_id] if selection_expr in ds_cache: mask = ds_cache[selection_expr] else: ds_cache.clear() else: ds_cache = cache[dataset._plot_id] = {} try: if dataset.interface.gridded: if mask is None: mask = selection_expr.apply(dataset, expanded=True, flat=False, strict=False) selection = dataset.clone(dataset.interface.mask(dataset, ~mask)) elif dataset.interface.multi: if mask is None: mask = selection_expr.apply(dataset, expanded=False, flat=False, strict=False) selection = dataset.iloc[mask] elif isinstance(element, (Curve, Spread)) and hasattr(dataset.interface, 'mask'): if mask is None: mask = selection_expr.apply(dataset, keep_index=True, strict=False) selection = dataset.clone(dataset.interface.mask(dataset, ~mask)) else: if mask is None: mask = selection_expr.apply(dataset, compute=False, keep_index=True, strict=False) selection = dataset.select(selection_mask=mask) except KeyError as e: key_error = str(e).replace('"', '').replace('.', '') raise CallbackError("linked_selection aborted because it could not " f"display selection for all elements: {key_error} on '{element!r}'.") from e except Exception as e: raise CallbackError("linked_selection aborted because it could not " "display selection for all elements: %s." % e) from e ds_cache[selection_expr] = mask else: selection = element return selection
[docs]class NoOpSelectionDisplay(SelectionDisplay): """ Selection display class that returns input element unchanged. For use with elements that don't support displaying selections. """ def build_selection(self, selection_streams, hvobj, operations, region_stream=None, cache=None): return hvobj
[docs]class OverlaySelectionDisplay(SelectionDisplay): """ Selection display base class that represents selections by overlaying colored subsets on top of the original element in an Overlay container. """ def __init__(self, color_prop='color', is_cmap=False, supports_region=True): if not isinstance(color_prop, (list, tuple)): self.color_props = [color_prop] else: self.color_props = color_prop self.is_cmap = is_cmap self.supports_region = supports_region def _get_color_kwarg(self, color): return {color_prop: [color] if self.is_cmap else color for color_prop in self.color_props} def build_selection(self, selection_streams, hvobj, operations, region_stream=None, cache=None): from .element import Histogram num_layers = len(selection_streams.style_stream.colors) if not num_layers: return Overlay() layers = [] for layer_number in range(num_layers): streams = [selection_streams.exprs_stream] obj = hvobj.clone(link=False) if layer_number == 1 else hvobj cmap_stream = selection_streams.cmap_streams[layer_number] layer = obj.apply( self._build_layer_callback, streams=[cmap_stream]+streams, layer_number=layer_number, cache=cache, per_element=True ) layers.append(layer) for layer_number in range(num_layers): layer = layers[layer_number] cmap_stream = selection_streams.cmap_streams[layer_number] streams = [selection_streams.style_stream, cmap_stream] layer = layer.apply( self._apply_style_callback, layer_number=layer_number, streams=streams, per_element=True ) layers[layer_number] = layer # Build region layer if region_stream is not None and self.supports_region: def update_region(element, region_element, colors, **kwargs): unselected_color = colors[0] if region_element is None: region_element = element._empty_region() return self._style_region_element(region_element, unselected_color) streams = [region_stream, selection_streams.style_stream] region = hvobj.clone(link=False).apply(update_region, streams, link_dataset=False) eltype = hvobj.type if isinstance(hvobj, DynamicMap) else type(hvobj) if getattr(eltype, '_selection_dims', None) == 1 or issubclass(eltype, Histogram): layers.insert(1, region) else: layers.append(region) return Overlay(layers).collate() @classmethod def _inject_cmap_in_pipeline(cls, pipeline, cmap): operations = [] for op in pipeline.operations: if hasattr(op, 'cmap'): op = op.instance(cmap=cmap) operations.append(op) return pipeline.instance(operations=operations) def _build_layer_callback(self, element, exprs, layer_number, cmap, cache, **kwargs): selection = self._select(element, exprs[layer_number], cache) pipeline = element.pipeline if cmap is not None: pipeline = self._inject_cmap_in_pipeline(pipeline, cmap) if element is selection: return pipeline(element.dataset) else: return pipeline(selection) def _apply_style_callback(self, element, layer_number, colors, cmap, alpha, **kwargs): opts = {} if layer_number == 0: opts['colorbar'] = False else: alpha = 1 if cmap is not None: opts['cmap'] = cmap color = colors[layer_number] if colors else None return self._build_element_layer(element, color, alpha, **opts) def _build_element_layer(self, element, layer_color, layer_alpha, selection_expr=True): raise NotImplementedError() def _style_region_element(self, region_element, unselected_cmap): raise NotImplementedError()
[docs]class ColorListSelectionDisplay(SelectionDisplay): """ Selection display class for elements that support coloring by a vectorized color list. """ def __init__(self, color_prop='color', alpha_prop='alpha', backend=None): self.color_props = [color_prop] self.alpha_props = [alpha_prop] self.backend = backend def build_selection(self, selection_streams, hvobj, operations, region_stream=None, cache=None): if cache is None: cache = {} def _build_selection(el, colors, alpha, exprs, **kwargs): from .plotting.util import linear_gradient ds = el.dataset selection_exprs = exprs[1:] unselected_color = colors[0] # Use darker version of unselected_color if not selected color provided unselected_color = unselected_color or "#e6e9ec" backup_clr = linear_gradient(unselected_color, "#000000", 7)[2] selected_colors = [c or backup_clr for c in colors[1:]] n = len(ds) clrs = np.array([unselected_color] + list(selected_colors)) color_inds = np.zeros(n, dtype='int8') for i, expr in zip(range(1, len(clrs)), selection_exprs): if not expr: color_inds[:] = i else: color_inds[expr.apply(ds)] = i colors = clrs[color_inds] color_opts = {color_prop: colors for color_prop in self.color_props} return el.pipeline(ds).opts(backend=self.backend, clone=True, **color_opts) sel_streams = [selection_streams.style_stream, selection_streams.exprs_stream] hvobj = hvobj.apply(_build_selection, streams=sel_streams, per_element=True, cache=cache) for op in operations: hvobj = op(hvobj) return hvobj
def _color_to_cmap(color): """ Create a light to dark cmap list from a base color """ from .plotting.util import linear_gradient # Lighten start color by interpolating toward white start_color = linear_gradient("#ffffff", color, 7)[2] # Darken end color by interpolating toward black end_color = linear_gradient(color, "#000000", 7)[2] return linear_gradient(start_color, end_color, 64)