Source code for

import sys

import numpy as np
import pandas as pd

from .. import util
from ..dimension import Dimension
from ..element import Element
from ..ndmapping import NdMapping, item_check, sorted_context
from .interface import Interface
from .pandas import PandasInterface

[docs]class DaskInterface(PandasInterface): """ The DaskInterface allows a Dataset objects to wrap a dask DataFrame object. Using dask allows loading data lazily and performing out-of-core operations on the data, making it possible to work on datasets larger than memory. The DaskInterface covers almost the complete API exposed by the PandasInterface with two notable exceptions: 1) Sorting is not supported and any attempt at sorting will be ignored with an warning. 2) Dask does not easily support adding a new column to an existing dataframe unless it is a scalar, add_dimension will therefore error when supplied a non-scalar value. 4) Not all functions can be easily applied to a dask dataframe so some functions applied with aggregate and reduce will not work. """ types = () datatype = 'dask' default_partitions = 100
[docs] @classmethod def loaded(cls): return 'dask.dataframe' in sys.modules and 'pandas' in sys.modules
[docs] @classmethod def applies(cls, obj): if not cls.loaded(): return False import dask.dataframe as dd return isinstance(obj, (dd.DataFrame, dd.Series))
@classmethod def init(cls, eltype, data, kdims, vdims): import dask.dataframe as dd data, dims, extra = PandasInterface.init(eltype, data, kdims, vdims) if not isinstance(data, dd.DataFrame): data = dd.from_pandas(data, npartitions=cls.default_partitions, sort=False) kdims = [ if isinstance(d, Dimension) else d for d in dims['kdims']] # If a key dimension can be found, speculatively reset index # to work around lacking dask support for MultiIndex if any(d for d in kdims if d not in data.columns): reset = data.reset_index() if all(d for d in kdims if d in reset.columns): data = reset return data, dims, extra
[docs] @classmethod def compute(cls, dataset): return dataset.clone(
[docs] @classmethod def persist(cls, dataset): return dataset.clone(
@classmethod def shape(cls, dataset): return (len(, len( @classmethod def range(cls, dataset, dimension): import dask.dataframe as dd dimension = dataset.get_dimension(dimension, strict=True) column =[] if column.dtype.kind == 'O': column = np.sort(column[column.notnull()].compute()) return (column[0], column[-1]) if len(column) else (None, None) else: if dimension.nodata is not None: column = cls.replace_value(column, dimension.nodata) return dd.compute(column.min(), column.max()) @classmethod def sort(cls, dataset, by=None, reverse=False): if by is None: by = [] dataset.param.warning('Dask dataframes do not support sorting') return @classmethod def values(cls, dataset, dim, expanded=True, flat=True, compute=True, keep_index=False): dim = dataset.get_dimension(dim) data =[] if not expanded: data = data.unique() if keep_index: return data.compute() if compute else data else: return data.compute().values if compute else data.values
[docs] @classmethod def select_mask(cls, dataset, selection): """ Given a Dataset object and a dictionary with dimension keys and selection keys (i.e. tuple ranges, slices, sets, lists. or literals) return a boolean mask over the rows in the Dataset object that have been selected. """ select_mask = None for dim, k in selection.items(): if isinstance(k, tuple): k = slice(*k) masks = [] alias = dataset.get_dimension(dim).name series =[alias] if isinstance(k, slice): if k.start is not None: # Workaround for dask issue #3392 kval = util.numpy_scalar_to_python(k.start) masks.append(kval <= series) if k.stop is not None: kval = util.numpy_scalar_to_python(k.stop) masks.append(series < kval) elif isinstance(k, (set, list)): iter_slc = None for ik in k: mask = series == ik if iter_slc is None: iter_slc = mask else: iter_slc |= mask masks.append(iter_slc) elif callable(k): masks.append(k(series)) else: masks.append(series == k) for mask in masks: if select_mask is not None: select_mask &= mask else: select_mask = mask return select_mask
@classmethod def select(cls, dataset, selection_mask=None, **selection): df = if selection_mask is not None: return df[selection_mask] selection_mask = cls.select_mask(dataset, selection) indexed = cls.indexed(dataset, selection) df = df if selection_mask is None else df[selection_mask] if indexed and len(df) == 1 and len(dataset.vdims) == 1: return df[dataset.vdims[0].name].compute().iloc[0] return df @classmethod def groupby(cls, dataset, dimensions, container_type, group_type, **kwargs): index_dims = [dataset.get_dimension(d) for d in dimensions] element_dims = [kdim for kdim in dataset.kdims if kdim not in index_dims] group_kwargs = {} if group_type != 'raw' and issubclass(group_type, Element): group_kwargs = dict(util.get_param_values(dataset), kdims=element_dims) group_kwargs.update(kwargs) # Propagate dataset group_kwargs['dataset'] = dataset.dataset data = [] group_by = [ for d in index_dims] groupby = if len(group_by) == 1: column =[group_by[0]] if == 'category': try: indices = ((ind,) for ind in except NotImplementedError: indices = ((ind,) for ind in column.unique().compute()) else: indices = ((ind,) for ind in column.unique().compute()) else: group_tuples =[group_by].itertuples() indices = util.unique_iterator(ind[1:] for ind in group_tuples) for coord in indices: if any(isinstance(c, float) and np.isnan(c) for c in coord): continue if len(coord) == 1: coord = coord[0] group = group_type(groupby.get_group(coord), **group_kwargs) data.append((coord, group)) if issubclass(container_type, NdMapping): with item_check(False), sorted_context(False): return container_type(data, kdims=index_dims) else: return container_type(data) @classmethod def aggregate(cls, dataset, dimensions, function, **kwargs): data = cols = [ for d in dataset.kdims if d in dimensions] vdims = dataset.dimensions('value', label='name') dtypes = data.dtypes numeric = [c for c, dtype in zip(dtypes.index, dtypes.values) if dtype.kind in 'iufc' and c in vdims] reindexed = data[cols+numeric] inbuilts = {'amin': 'min', 'amax': 'max', 'mean': 'mean', 'std': 'std', 'sum': 'sum', 'var': 'var'} if len(dimensions): groups = reindexed.groupby(cols) if (function.__name__ in inbuilts): agg = getattr(groups, inbuilts[function.__name__])() else: agg = groups.apply(function) df = agg.reset_index() else: if (function.__name__ in inbuilts): agg = getattr(reindexed, inbuilts[function.__name__])() else: raise NotImplementedError df = pd.DataFrame(agg.compute()).T dropped = [] for vd in vdims: if vd not in df.columns: dropped.append(vd) return df, dropped
[docs] @classmethod def unpack_scalar(cls, dataset, data): """ Given a dataset object and data in the appropriate format for the interface, return a simple scalar. """ import dask.dataframe as dd if len(data.columns) > 1 or len(data) != 1: return data if isinstance(data, dd.DataFrame): data = data.compute() return data.iat[0,0]
@classmethod def sample(cls, dataset, samples=None): if samples is None: samples = [] data = dims = dataset.dimensions('key', label='name') mask = None for sample in samples: if np.isscalar(sample): sample = [sample] for c, v in zip(dims, sample): dim_mask = data[c]==v if mask is None: mask = dim_mask else: mask |= dim_mask return data[mask] @classmethod def add_dimension(cls, dataset, dimension, dim_pos, values, vdim): data = if not in data.columns: if not np.isscalar(values): if len(values): err = ('Dask dataframe does not support assigning ' 'non-scalar value.') raise NotImplementedError(err) values = None data = data.assign(**{ values}) return data @classmethod def concat_fn(cls, dataframes, **kwargs): import dask.dataframe as dd return dd.concat(dataframes, **kwargs) @classmethod def dframe(cls, dataset, dimensions): if dimensions: return[dimensions].compute() else: return @classmethod def nonzero(cls, dataset): return True
[docs] @classmethod def iloc(cls, dataset, index): """ Dask does not support iloc, therefore iloc will execute the call graph and lose the laziness of the operation. """ rows, cols = index scalar = False if isinstance(cols, slice): cols = [ for d in dataset.dimensions()][cols] elif np.isscalar(cols): scalar = np.isscalar(rows) cols = [dataset.get_dimension(cols).name] else: cols = [dataset.get_dimension(d).name for d in index[1]] if np.isscalar(rows): rows = [rows] data = {} for c in cols: data[c] =[c].compute().iloc[rows].values if scalar: return data[cols[0]][0] return tuple(data.values())