Streams

In [1]:
import numpy as np
import holoviews as hv
hv.notebook_extension()

In the DynamicMap tutorial we saw how DynamicMap allows you to explore high dimensional data using the widgets in the same style as HoloMaps. Although suitable for unbounded exploration of large parameter spaces, the DynamicMaps described in that notebook support exactly the same mode of interaction as HoloMaps. In particular, the key dimensions are used to specify a set of widgets that when manipulated apply the appopriate indexing to invoke the user-supplied callable.

In this tutorial we will explore the HoloViews streams system that allows any sort of value to be supplied from anywhere. This system opens a huge set of new possible visualization types, including continuously updating plots that reflect live data as well as dynamic visualizations that can be interacted with directly, as described in the Linked Streams tutorial.

In [2]:
# Styles and plot options used in this tutorial
%opts Ellipse [bgcolor='w'] (color='b')
%opts Image (cmap='viridis')
%opts VLine (color='r' linewidth=2) HLine (color='r' linewidth=1)
%opts Path [show_grid=False bgcolor='w'] (color='k' linestyle='-.')
%opts Area (hatch='\\' facecolor='cornsilk' linewidth=2 edgecolor='k')

A simple DynamicMap

Before introducing streams, let us declare a simple DynamicMap of the sort discussed in the DynamicMap tutorial. This example consists of an Curve element showing a Lissajous curve with VLine and HLine annotation to form a crosshair:

In [3]:
lin = np.linspace(-np.pi,np.pi,300)

def lissajous(t, a,b, delta):
    return (np.sin(a * t + delta), np.sin(b * t))

def lissajous_curve(t, a=3,b=5, delta=np.pi/2):
    (x,y) = lissajous(t,a,b,delta)
    return hv.Path(lissajous(lin,a,b,delta)) * hv.VLine(x) * hv.HLine(y)

hv.DynamicMap(lissajous_curve, kdims=['t']).redim.range(t=(-3.,3.))
Out[3]:

As expected, the declared key dimension (kdims) has turned into slider widgets that let us move the crosshair along the curve. Now let's see how to position the crosshair using streams.

Introducing streams

The core concept behind a stream is simple: it is a parameter that can change over time that automatically refreshes code depending on those parameter values.

Like all objects in HoloViews, these parameters are declared using param and you can define streams as a parameterized subclass of the holoviews.streams.Stream. A more convenient way is to use the Stream.define classmethod:

In [4]:
from holoviews.streams import Stream, param
Time = Stream.define('Time', t=0.0)

This results in a Time class with a numeric t parameter that defaults to zero. As this object is parameterized, we can use hv.help to view it's parameters:

In [5]:
hv.help(Time)
Parameters of 'Time'
====================

Parameters changed from their default values are marked in red.
Soft bound values are marked in cyan.
C/V= Constant/Variable, RO/RW = ReadOnly/ReadWrite, AN=Allow None

NameValue  Type   Mode 

t   0.0  Number  C RW 

Parameter docstrings:
=====================

t: < No docstring available >

This parameter is a param.Number as we supplied a float, if we have supplied an integer it would have been a param.Integer. Notice that there is no docstring in the help output above but we can add one by explicit defining the parameter as follows:

In [6]:
Time = Stream.define('Time', t=param.Number(default=0.0, doc='A time parameter'))
hv.help(Time)
Parameters of 'Time'
====================

Parameters changed from their default values are marked in red.
Soft bound values are marked in cyan.
C/V= Constant/Variable, RO/RW = ReadOnly/ReadWrite, AN=Allow None

NameValue  Type   Mode 

t   0.0  Number  V RW 

Parameter docstrings:
=====================

t: A time parameter

Now we have defined this Time stream class, we can make of an instance of it and looks at its parameters:

In [7]:
time_dflt = Time()
print('This Time instance has parameter t={t}'.format(t=time_dflt.t))
This Time instance has parameter t=0.0

As with all parameterized classes, we can instantiate our parameters with a suitable value of our choice instead of relying on defaults.

In [8]:
time = Time(t=np.pi/4)
print('This Time instance has parameter t={t}'.format(t=time.t))
This Time instance has parameter t=0.7853981633974483

For more information on defining Stream classes this way, use hv.help(Stream.define).

Simple streams example

We can now supply this streams object to a DynamicMap using the same lissajous_curve callback above by adding it to the streams list:

In [9]:
dmap = hv.DynamicMap(lissajous_curve, streams=[time])
dmap + lissajous_curve(t=np.pi/4)
Out[9]:

Immediately we see that the crosshair position of the DynamicMap reflects the t parameter values we set on the Time stream. This means that the t parameter was supplied as the argument to the lissajous_curve callback. As we now have no key dimensions, there is no widgets for the t dimensions.

Although we have what looks like a static plot, it is in fact dynamic and can be updated in place at any time. To see this, we can call the event method on our DynamicMap:

In [10]:
dmap.event( t=0.2)

Running this cell will have updated the crosshair from its original position where $t=\frac{\pi}{4}$ to a new position where t=0.2. Try running the cell above with different values of t and watch the plot update!

This event method is the recommended way of updating the stream parameters on a DynamicMap but if you have a handle on the relevant stream instance, you can also call the event method on that:

In [11]:
time.event(t=-0.2)

Running the cell above also move the crosshair to a new position. As there are no key dimensions, there is only a single valid (empty) key that can be accessed with dmap[()] or dmap.select() making event the only way to explore new parameters.

We will examine the event method and the machinery that powers streams in more detail later in the tutorial after we have looked at more examples of how streams are used in practice.

Working with multiple streams

The previous example showed a curve parameterized by a single dimension t. Often you will have multiple stream parameters you would like to declare as follows:

In [12]:
ls = np.linspace(0, 10, 200)
xx, yy = np.meshgrid(ls, ls)

XY = Stream.define('XY',x=0.0,y=0.0)

def marker(x,y):
    return hv.Image(np.sin(xx)*np.cos(yy)) * hv.VLine(x) * hv.HLine(y)

dmap = hv.DynamicMap(marker, streams=[XY()])
dmap
Out[12]:

You can update both x and y by passing multiple keywords to the event method:

In [13]:
dmap.event(x=-0.2, y=0.1)

Note that the definition above behaves the same as this definition where we define separate X and Y stream classes:

X = Stream.define('X',x=0.0)
Y = Stream.define('Y',y=0.0)
hv.DynamicMap(crosshairs, streams=[X(),Y()])

The reason why you might want to list multiple streams instead of always defining a single stream containing all the required stream parameters will be made clear in the Linked Streams tutorial.

Combining streams and key dimensions

All the DynamicMap examples above can't be indexed with anything other than dmap[()] or dmap.select() as none of them had any key dimensions. This was to focus exclusively on the streams system at the start of the tutorial and not because you can't combine key dimensions and streams:

In [14]:
%opts Curve (linestyle='-')
xs = np.linspace(-3, 3, 400)

def function(xs, time):
    "Some time varying function"
    return np.exp(np.sin(xs+np.pi/time))

def integral(limit, time):
    curve = hv.Curve((xs, function(xs, time)))[limit:]
    area = hv.Area((xs, function(xs, time)))[:limit]
    summed = area.dimension_values('y').sum() * 0.015  # Numeric approximation
    return (area * curve * hv.VLine(limit) * hv.Text(limit + 0.5, 2.0, '%.2f' % summed))

Time = Stream.define('Time', time=1.0)
dmap=hv.DynamicMap(integral, kdims=['limit'], streams=[Time()]).redim.range(limit=(-3,2))
dmap
Out[14]:

In this example, you can drag the slider to see a numeric approximation to the integral on the left side on the VLine.

As 'limit' is declared as a key dimension, it is given a normal HoloViews slider. As we have also defined a time stream, we can update the displayed curve for any time value:

In [15]:
dmap.event(time=8)

We now see how to control the time argument of the integral function by triggered an event with a new time value, and how to control the limit argument by moving a slider. Controlling limit with a slider this way is valid but also a little unintuitive: what if you could control limit just by hovering over the plot?

In the Linked Streams tutorial, we will see how we can do exactly this by switching to the bokeh backend and using the linked streams system.

Matching names to arguments

Note that in the example above, the key dimension names and the stream parameter names match the arguments to the callable. This must be true for stream parameters but this isn't a requirement of key dimensions: if you replace the word 'radius' with 'size' in the example above after XY is defined, the example still works.

Here are the rules regarding the callback argument names:

  • If your key dimensions and stream parameters match the callable argument names, the definition is valid.
  • If your callable accepts mandatory positional arguments and their number matches the number of key dimensions, the names don't need to match and these arguments will be passed key dimensions values.

As stream parameters always need to match the argument names, there is a method to allow them to be easily renamed. Let's say you imported a stream class as shown in the Linked Streams tutorial or for this example, reuse the existing XY stream class. You can then use the rename method allowing the following definition:

In [16]:
def integral2(lim, t): 
    'Same as integral with different argument names'
    return integral(lim, t)

dmap = hv.DynamicMap(integral2, kdims=['limit'], streams=[Time().rename(time='t')]).redim.range(limit=(-3.,3.))
dmap
Out[16]:

Overlapping stream and key dimensions

In the above example above, the stream parameters do not overlap with the declared key dimension. What happens if we add 'time' to the declared key dimensions?

In [17]:
dmap=hv.DynamicMap(integral, kdims=['time','limit'], streams=[Time()]).redim.range(limit=(-3.,3.))
dmap
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
~/miniconda/envs/anacondaviz/lib/python3.6/site-packages/IPython/core/formatters.py in __call__(self, obj, include, exclude)
    968 
    969             if method is not None:
--> 970                 return method(include=include, exclude=exclude)
    971             return None
    972         else:

~/topographica/external/holoviews/holoviews/core/dimension.py in _repr_mimebundle_(self, include, exclude)
   1229         combined and returned.
   1230         """
-> 1231         return Store.render(self)
   1232 
   1233 

~/topographica/external/holoviews/holoviews/core/options.py in render(cls, obj)
   1287         data, metadata = {}, {}
   1288         for hook in hooks:
-> 1289             ret = hook(obj)
   1290             if ret is None:
   1291                 continue

~/topographica/external/holoviews/holoviews/ipython/display_hooks.py in pprint_display(obj)
    278     if not ip.display_formatter.formatters['text/plain'].pprint:
    279         return None
--> 280     return display(obj, raw_output=True)
    281 
    282 

~/topographica/external/holoviews/holoviews/ipython/display_hooks.py in display(obj, raw_output, **kwargs)
    254     elif isinstance(obj, (HoloMap, DynamicMap)):
    255         with option_state(obj):
--> 256             output = map_display(obj)
    257     elif isinstance(obj, Plot):
    258         output = render(obj)

~/topographica/external/holoviews/holoviews/ipython/display_hooks.py in wrapped(element)
    140         try:
    141             max_frames = OutputSettings.options['max_frames']
--> 142             mimebundle = fn(element, max_frames=max_frames)
    143             if mimebundle is None:
    144                 return {}, {}

~/topographica/external/holoviews/holoviews/ipython/display_hooks.py in map_display(vmap, max_frames)
    208         return None
    209 
--> 210     return render(vmap)
    211 
    212 

~/topographica/external/holoviews/holoviews/ipython/display_hooks.py in render(obj, **kwargs)
     63         renderer = renderer.instance(fig='png')
     64 
---> 65     return renderer.components(obj, **kwargs)
     66 
     67 

~/topographica/external/holoviews/holoviews/plotting/renderer.py in components(self, obj, fmt, comm, **kwargs)
    324         data, metadata = {}, {}
    325         if isinstance(plot, NdWidget):
--> 326             js, html = plot(as_script=True)
    327             plot_id = plot.plot_id
    328             widget_id = plot.id

~/topographica/external/holoviews/holoviews/plotting/widgets/__init__.py in __call__(self, as_script)
    163 
    164     def __call__(self, as_script=False):
--> 165         data = self._get_data()
    166         html = self.render_html(data)
    167         js = self.render_js(data)

~/topographica/external/holoviews/holoviews/plotting/widgets/__init__.py in _get_data(self)
    446 
    447     def _get_data(self):
--> 448         data = super(SelectionWidget, self)._get_data()
    449         widgets, dimensions, init_dim_vals = self.get_widgets()
    450         key_data = {} if self.plot.dynamic else self.get_key_data()

~/topographica/external/holoviews/holoviews/plotting/widgets/__init__.py in _get_data(self)
    191             json_path = json_path + '/'
    192         dynamic = json.dumps(self.plot.dynamic) if self.plot.dynamic else 'false'
--> 193         return dict(CDN=CDN, frames=self.get_frames(), delay=delay,
    194                     cached=cached, load_json=load_json, mode=mode, id=self.id,
    195                     Nframes=len(self.plot), widget_name=name, json_path=json_path,

~/topographica/external/holoviews/holoviews/plotting/mpl/widgets.py in get_frames(self)
     15             return super(MPLWidget, self).get_frames()
     16         else:
---> 17             frames = {0: self._plot_figure(self.init_key)}
     18         return self.encode_frames(frames)
     19 

~/topographica/external/holoviews/holoviews/plotting/mpl/widgets.py in _plot_figure(self, idx)
     21     def _plot_figure(self, idx):
     22         with self.renderer.state():
---> 23             self.plot.update(idx)
     24             if self.renderer.fig == 'auto':
     25                 figure_format = self.renderer.params('fig').objects[0]

~/topographica/external/holoviews/holoviews/plotting/mpl/plot.py in update(self, key)
    243         if len(self) == 1 and ((key == 0) or (key == self.keys[0])) and not self.drawn:
    244             return self.initialize_plot()
--> 245         return self.__getitem__(key)
    246 
    247 

~/topographica/external/holoviews/holoviews/plotting/plot.py in __getitem__(self, frame)
    250         if not isinstance(frame, tuple):
    251             frame = self.keys[frame]
--> 252         self.update_frame(frame)
    253         return self.state
    254 

~/topographica/external/holoviews/holoviews/plotting/mpl/plot.py in wrapper(self, *args, **kwargs)
     41     def wrapper(self, *args, **kwargs):
     42         with _rc_context(self.fig_rcparams):
---> 43             return f(self, *args, **kwargs)
     44     return wrapper
     45 

~/topographica/external/holoviews/holoviews/plotting/mpl/element.py in update_frame(self, key, ranges, element)
    822         reused = isinstance(self.hmap, DynamicMap) and self.overlaid
    823         if element is None and not reused:
--> 824             element = self._get_frame(key)
    825         elif element is not None:
    826             self.current_frame = element

~/topographica/external/holoviews/holoviews/plotting/plot.py in _get_frame(self, key)
    732         cached = self.current_key is None
    733         key_map = dict(zip([d.name for d in self.dimensions], key))
--> 734         frame = get_plot_frame(self.hmap, key_map, cached)
    735         traverse_setter(self, '_force', False)
    736 

~/topographica/external/holoviews/holoviews/plotting/util.py in get_plot_frame(map_obj, key_map, cached)
    253         # Special handling for static plots
    254         return map_obj.last
--> 255     key = tuple(key_map[kd.name] for kd in map_obj.kdims)
    256     if key in map_obj.data and cached:
    257         return map_obj.data[key]

~/topographica/external/holoviews/holoviews/plotting/util.py in <genexpr>(.0)
    253         # Special handling for static plots
    254         return map_obj.last
--> 255     key = tuple(key_map[kd.name] for kd in map_obj.kdims)
    256     if key in map_obj.data and cached:
    257         return map_obj.data[key]

KeyError: 'limit'
Out[17]:
:DynamicMap   [time,limit]
   :Overlay
      .Area.I  :Area   [x]   (y)
      .Curve.I :Curve   [x]   (y)
      .VLine.I :VLine   [x,y]
      .Text.I  :Text   [x,y]

First you might notice that the 'time' value is now shown in the title but that there is no corresponding time slider as its value is supplied by the stream.

The 'time'parameters is now what is called 'dimensioned streams' which renables indexing of these dimensions:

In [18]:
dmap[1,0] + dmap.select(time=3,limit=1.5) + dmap[None,1.5]
Out[18]:

In A, we supply our own values for the 'time and 'limit' parameters. This doesn't change the values of the 'time' parameters on the stream itself but it does allow us to see what would happen when the time value is one. Note the use of None in C as a way of leaving an explicit value unspecified, allowing the current stream value to be used.

This is one good reason to use dimensioned streams - it restores access to convenient indexing and selecting operation as a way of exploring your visualizations. The other reason it is useful is that if you keep all your parameters dimensioned, it re-enables the DynamicMap cache described in the DynamicMap tutorial, allowing you to record your interaction with streams and allowing you to cast to HoloMap for export:

In [19]:
dmap.reset()  # Reset the cache, we don't want the values from the cell above
# TODO: redim the limit dimension to a default of 0
dmap.event(time=1)
dmap.event(time=1.5)
dmap.event(time=2)
hv.HoloMap(dmap)
Out[19]:
:HoloMap   [time,limit]

One use of this would be to have a simulator drive a visualization forward using event in a loop. You could then stop your simulation and retain the a recent history of the output as long as the allowed DynamicMap cache.

Generators and argument-free callables

In addition to callables, Python supports generators that can be defined with the yield keyword. Calling a function that uses yield returns a generator iterator object that accepts no arguments but returns new values when iterated or when next() is applied to it.

HoloViews supports Python generators for completeness and generator expressions can be a convenient way to define code inline instead of using lambda functions. As generators expressions don't accept arguments and can get 'exhausted' we recommend using callables with DynamicMap - exposing the relevant arguments also exposes control over your visualization.

Callables that have arguments, unlike generators allow you to re-visit portions of your parameter space instead of always being forced in one direction via calls to next(). With this caveat in mind, here is an example of a generator and the corresponding generator iterator that returns a BoxWhisker element:

In [20]:
def sample_distributions(samples=10, tol=0.04):
    np.random.seed(42)
    while True:
        gauss1 = np.random.normal(size=samples)
        gauss2 = np.random.normal(size=samples)
        data = (['A']*samples + ['B']*samples, np.hstack([gauss1, gauss2]))
        yield hv.BoxWhisker(data, kdims=['Group'], vdims=['Value'])
        samples+=1
        
sample_generator = sample_distributions()

This returns two box whiskers representing samples from two Gaussian distributions of 10 samples. Iterating over this generator simply resamples from these distributions using an additional sample each time.

As with a callable, we can pass our generator iterator to DynamicMap:

In [21]:
hv.DynamicMap(sample_generator)
Out[21]:

Without using streams, we now have a problem as there is no way to trigger the generator to view the next distribution in the sequence. We can solve this by defining a stream with no parameters:

In [22]:
dmap = hv.DynamicMap(sample_generator, streams=[Stream.define('Next')()])
dmap
Out[22]:

Stream event update loops

Now we can simply use event() to drive the generator forward and update the plot, showing how the two Gaussian distributions converge as the number of samples increase.

In [23]:
for i in range(40):
    dmap.event()

Note that there is a better way to run loops that drive dmap.event() which supports a period (in seconds) between updates and a timeout argument (also in seconds):

In [24]:
dmap.periodic(0.1, 1000, timeout=3)

In this generator example, event does not require any arguments but you can set the param_fn argument to a callable that takes an iteration counter and returns a dictionary for setting the stream parameters. In addition you can use block=False to avoid blocking the notebook using a threaded loop. This can be very useful although there can have two downsides 1. all running visualizations using non-blocking updates will be competing for computing resources 2. if you override a variable that the thread is actively using, there can be issues with maintaining consistent state in the notebook.

Generally, the periodic utility is recommended for all such event update loops and it will be used instead of explicit loops in the rest of the tutorials involving streams.

Using next()

The approach shown above of using an empty stream works in an exactly analogous fashion for callables that take no arguments. In both cases, the DynamicMap next() method is enabled:

In [25]:
hv.HoloMap({i:next(dmap) for i in range(10)}, kdims=['Iteration'])
Out[25]:

Next steps

The streams system allows you to update plots in place making it possible to build live visualizations that update in response to incoming live data or any other type of event. As we have seen in this tutorial, you can use streams together with key dimensions to add additional interactivity to your plots while retaining the familiar widgets.

This tutorial used examples that work with either the matplotlib or bokeh backends. In the Linked Streams tutorial, you will see how you can directly interact with dynamic visualizations when using the bokeh backend.

[Advanced] How streams work

This optional section is not necessary for users who simply want to use the streams system, but it does describes how streams actually work in more detail.

A stream class is one that inherits from Stream that typically defines some new parameters. We have already seen one convenient way of defining a stream class:

In [26]:
defineXY = Stream.define('defineXY', x=0.0, y=0.0)

This is equivalent to the following definition which would be more appropriate in library code or for complex stream class requiring lots of parameters that need to be documented:

In [27]:
class XY(Stream):
    x = param.Number(default=0.0, constant=True, doc='An X position.')
    y = param.Number(default=0.0, constant=True, doc='A Y position.')

As we have already seen, we can make an instance of XY with some initial values for x and y.

In [28]:
xy = XY(x=2,y=3)

However, trying to modify these parameters directly will result in an exception as they have been declared constant (e.g xy.x=4 will throw an error). This is because there are two allowed ways of modifying these parameters, the simplest one being update:

In [29]:
xy.update(x=4,y=50)
xy.rename(x='xpos', y='ypos').contents
Out[29]:
{'xpos': 4, 'ypos': 50}

This shows how you can update the parameters and also shows the correct way to view the stream parameter values via the contents property as this will apply any necessary renaming.

So far, using update has done nothing but forced us to access parameter a certain way. What makes streams work are the side-effects you can trigger when changing a value via the event method. The relevant side-effect is to invoke callables called 'subscribers'

Subscribers

Without defining any subscribes, the event method is identical to update:

In [30]:
xy = XY()
xy.event(x=4,y=50)
xy.contents
Out[30]:
{'x': 4, 'y': 50}

Now let's add a subscriber:

In [31]:
def subscriber(xpos,ypos):
    print('The subscriber received xpos={xpos} and ypos={ypos}'.format(xpos=xpos,ypos=ypos))

xy = XY().rename(x='xpos', y='ypos')
xy.add_subscriber(subscriber)
xy.event(x=4,y=50)
The subscriber received xpos=4 and ypos=50

As we can see, now when you call event, our subscriber is called with the updated parameter values, renamed as appropriate. The event method accepts the original parameter names and the subscriber receives the new values after any renaming is applied. You can add as many subscribers as you want and you can clear them using the clear method:

In [32]:
xy.clear()
xy.event(x=0,y=0)

When you define a DynamicMap using streams, the HoloViews plotting system install the necessary callbacks as subscibers to update the plot when the stream parameters change. The above example clears all subscribers (it is equivalent to clear('all'). To clear only the subscribers you define yourself use clear('user') and to clear any subscribers installed by the HoloViews plotting system use clear('internal').

When using linked streams as described in the Linked Streams tutorial, the plotting system recognizes the stream class and registers the necessary machinery with Bokeh to update the stream values based on direct interaction with the plot.


Download this notebook from GitHub (right-click to download).