Source code for holoviews.core.data.pandas

import numpy as np
import pandas as pd
from packaging.version import Version
from pandas.api.types import is_numeric_dtype

from .. import util
from ..dimension import Dimension, dimension_name
from ..element import Element
from ..ndmapping import NdMapping, item_check, sorted_context
from .interface import DataError, Interface
from .util import finite_range


[docs]class PandasAPI: """ This class is used to describe the interface as having a pandas-like API. The reason to have this class is that it is not always possible to directly inherit from the PandasInterface. This class should not have any logic as it should be used like: if issubclass(interface, PandasAPI): ... """
[docs]class PandasInterface(Interface, PandasAPI): types = (pd.DataFrame,) datatype = 'dataframe' @classmethod def dimension_type(cls, dataset, dim): name = dataset.get_dimension(dim, strict=True).name idx = list(dataset.data.columns).index(name) return dataset.data.dtypes.iloc[idx].type @classmethod def init(cls, eltype, data, kdims, vdims): element_params = eltype.param.objects() kdim_param = element_params['kdims'] vdim_param = element_params['vdims'] if util.is_series(data): name = data.name or util.anonymous_dimension_label data = data.to_frame(name=name) if util.is_dataframe(data): ncols = len(data.columns) index_names = data.index.names if isinstance(data, pd.DataFrame) else [data.index.name] if index_names == [None]: index_names = ['index'] if eltype._auto_indexable_1d and ncols == 1 and kdims is None: kdims = list(index_names) if isinstance(kdim_param.bounds[1], int): ndim = min([kdim_param.bounds[1], len(kdim_param.default)]) else: ndim = None nvdim = vdim_param.bounds[1] if isinstance(vdim_param.bounds[1], int) else None if kdims and vdims is None: vdims = [c for c in data.columns if c not in kdims] elif vdims and kdims is None: kdims = [c for c in data.columns if c not in vdims][:ndim] elif kdims is None: kdims = list(data.columns[:ndim]) if vdims is None: vdims = [d for d in data.columns[ndim:((ndim+nvdim) if nvdim else None)] if d not in kdims] elif kdims == [] and vdims is None: vdims = list(data.columns[:nvdim if nvdim else None]) if any(not isinstance(d, (str, Dimension)) for d in kdims+vdims): raise DataError( "Having a non-string as a column name in a DataFrame is not supported." ) # Handle reset of index if kdims reference index by name for kd in kdims: kd = dimension_name(kd) if kd in data.columns: continue if any(kd == ('index' if name is None else name) for name in index_names): data = data.reset_index() break if kdims: kdim = dimension_name(kdims[0]) if eltype._auto_indexable_1d and ncols == 1 and kdim not in data.columns: data = data.copy() data.insert(0, kdim, np.arange(len(data))) for d in kdims+vdims: d = dimension_name(d) if len([c for c in data.columns if c == d]) > 1: raise DataError('Dimensions may not reference duplicated DataFrame ' 'columns (found duplicate %r columns). If you want to plot ' 'a column against itself simply declare two dimensions ' 'with the same name. '% d, cls) else: # Check if data is of non-numeric type # Then use defined data type kdims = kdims if kdims else kdim_param.default vdims = vdims if vdims else vdim_param.default columns = list(util.unique_iterator([dimension_name(d) for d in kdims+vdims])) if isinstance(data, dict) and all(c in data for c in columns): data = dict((d, data[d]) for d in columns) elif isinstance(data, list) and len(data) == 0: data = {c: np.array([]) for c in columns} elif isinstance(data, (list, dict)) and data in ([], {}): data = None elif (isinstance(data, dict) and not all(d in data for d in columns) and not any(isinstance(v, np.ndarray) for v in data.values())): column_data = sorted(data.items()) k, v = column_data[0] if len(util.wrap_tuple(k)) != len(kdims) or len(util.wrap_tuple(v)) != len(vdims): raise ValueError("Dictionary data not understood, should contain a column " "per dimension or a mapping between key and value dimension " "values.") column_data = zip(*((util.wrap_tuple(k)+util.wrap_tuple(v)) for k, v in column_data)) data = dict(((c, col) for c, col in zip(columns, column_data))) elif isinstance(data, np.ndarray): if data.ndim == 1: if eltype._auto_indexable_1d and len(kdims)+len(vdims)>1: data = (np.arange(len(data)), data) else: data = np.atleast_2d(data).T else: data = tuple(data[:, i] for i in range(data.shape[1])) if isinstance(data, tuple): data = [np.array(d) if not isinstance(d, np.ndarray) else d for d in data] min_dims = (kdim_param.bounds[0] or 0) + (vdim_param.bounds[0] or 0) if any(d.ndim > 1 for d in data): raise ValueError('PandasInterface cannot interpret multi-dimensional arrays.') elif len(data) < min_dims: raise DataError('Data contains fewer columns than the %s element expects. Expected ' 'at least %d columns but found only %d columns.' % (eltype.__name__, min_dims, len(data))) elif not cls.expanded(data): raise ValueError('PandasInterface expects data to be of uniform shape.') data = pd.DataFrame(dict(zip(columns, data)), columns=columns) elif ((isinstance(data, dict) and any(c not in data for c in columns)) or (isinstance(data, list) and any(isinstance(d, dict) and c not in d for d in data for c in columns))): raise ValueError('PandasInterface could not find specified dimensions in the data.') else: data = pd.DataFrame(data, columns=columns) return data, {'kdims':kdims, 'vdims':vdims}, {} @classmethod def isscalar(cls, dataset, dim): name = dataset.get_dimension(dim, strict=True).name return len(dataset.data[name].unique()) == 1 @classmethod def validate(cls, dataset, vdims=True): dim_types = 'all' if vdims else 'key' dimensions = dataset.dimensions(dim_types, label='name') cols = list(dataset.data.columns) not_found = [d for d in dimensions if d not in cols] if not_found: raise DataError("Supplied data does not contain specified " "dimensions, the following dimensions were " "not found: %s" % repr(not_found), cls) @classmethod def range(cls, dataset, dimension): dimension = dataset.get_dimension(dimension, strict=True) column = dataset.data[dimension.name] if column.dtype.kind == 'O': if (not isinstance(dataset.data, pd.DataFrame) or util.pandas_version < Version('0.17.0')): column = column.sort(inplace=False) else: column = column.sort_values() try: column = column[~column.isin([None, pd.NA])] except Exception: pass if not len(column): return np.nan, np.nan return column.iloc[0], column.iloc[-1] else: if dimension.nodata is not None: column = cls.replace_value(column, dimension.nodata) cmin, cmax = finite_range(column, column.min(), column.max()) if column.dtype.kind == 'M' and getattr(column.dtype, 'tz', None): return (cmin.to_pydatetime().replace(tzinfo=None), cmax.to_pydatetime().replace(tzinfo=None)) return cmin, cmax @classmethod def concat_fn(cls, dataframes, **kwargs): if util.pandas_version >= Version('0.23.0'): kwargs['sort'] = False return pd.concat(dataframes, **kwargs) @classmethod def concat(cls, datasets, dimensions, vdims): dataframes = [] for key, ds in datasets: data = ds.data.copy() for d, k in zip(dimensions, key): data[d.name] = k dataframes.append(data) return cls.concat_fn(dataframes) @classmethod def groupby(cls, dataset, dimensions, container_type, group_type, **kwargs): index_dims = [dataset.get_dimension(d, strict=True) for d in dimensions] element_dims = [kdim for kdim in dataset.kdims if kdim not in index_dims] group_kwargs = {} if group_type != 'raw' and issubclass(group_type, Element): group_kwargs = dict(util.get_param_values(dataset), kdims=element_dims) group_kwargs.update(kwargs) # Propagate dataset group_kwargs['dataset'] = dataset.dataset group_by = [d.name for d in index_dims] if len(group_by) == 1 and util.pandas_version >= Version("1.5.0"): # Because of this deprecation warning from pandas 1.5.0: # In a future version of pandas, a length 1 tuple will be returned # when iterating over a groupby with a grouper equal to a list of length 1. # Don't supply a list with a single grouper to avoid this warning. group_by = group_by[0] data = [(k, group_type(v, **group_kwargs)) for k, v in dataset.data.groupby(group_by, sort=False)] if issubclass(container_type, NdMapping): with item_check(False), sorted_context(False): return container_type(data, kdims=index_dims) else: return container_type(data) @classmethod def aggregate(cls, dataset, dimensions, function, **kwargs): data = dataset.data cols = [d.name for d in dataset.kdims if d in dimensions] vdims = dataset.dimensions('value', label='name') reindexed = data[cols+vdims] if function in [np.std, np.var]: # Fix for consistency with other backend # pandas uses ddof=1 for std and var fn = lambda x: function(x, ddof=0) else: fn = util._PANDAS_FUNC_LOOKUP.get(function, function) if len(dimensions): # The reason to use `numeric_cols` is to prepare for when pandas will not # automatically drop columns that are not numerical for numerical # functions, e.g., `np.mean`. # pandas started warning about this in v1.5.0 if function in [np.size]: # np.size actually works with non-numerical columns numeric_cols = [ c for c in reindexed.columns if c not in cols ] else: numeric_cols = [ c for c, d in zip(reindexed.columns, reindexed.dtypes) if is_numeric_dtype(d) and c not in cols ] grouped = reindexed.groupby(cols, sort=False) df = grouped[numeric_cols].aggregate(fn, **kwargs).reset_index() else: agg = reindexed.apply(fn, **kwargs) data = {col: [v] for col, v in zip(agg.index, agg.values)} df = pd.DataFrame(data, columns=list(agg.index)) dropped = [] for vd in vdims: if vd not in df.columns: dropped.append(vd) return df, dropped
[docs] @classmethod def unpack_scalar(cls, dataset, data): """ Given a dataset object and data in the appropriate format for the interface, return a simple scalar. """ if len(data) != 1 or len(data.columns) > 1: return data return data.iat[0,0]
@classmethod def reindex(cls, dataset, kdims=None, vdims=None): # DataFrame based tables don't need to be reindexed return dataset.data @classmethod def mask(cls, dataset, mask, mask_value=np.nan): masked = dataset.data.copy() cols = [vd.name for vd in dataset.vdims] masked.loc[mask, cols] = mask_value return masked @classmethod def redim(cls, dataset, dimensions): column_renames = {k: v.name for k, v in dimensions.items()} return dataset.data.rename(columns=column_renames) @classmethod def sort(cls, dataset, by=None, reverse=False): if by is None: by = [] cols = [dataset.get_dimension(d, strict=True).name for d in by] if (not isinstance(dataset.data, pd.DataFrame) or util.pandas_version < Version('0.17.0')): return dataset.data.sort(columns=cols, ascending=not reverse) return dataset.data.sort_values(by=cols, ascending=not reverse) @classmethod def select(cls, dataset, selection_mask=None, **selection): df = dataset.data if selection_mask is None: selection_mask = cls.select_mask(dataset, selection) indexed = cls.indexed(dataset, selection) if isinstance(selection_mask, pd.Series): df = df[selection_mask] else: df = df.iloc[selection_mask] if indexed and len(df) == 1 and len(dataset.vdims) == 1: return df[dataset.vdims[0].name].iloc[0] return df @classmethod def values( cls, dataset, dim, expanded=True, flat=True, compute=True, keep_index=False, ): dim = dataset.get_dimension(dim, strict=True) data = dataset.data[dim.name] if keep_index: return data if data.dtype.kind == 'M' and getattr(data.dtype, 'tz', None): data = data.dt.tz_localize(None) if not expanded: return pd.unique(data) return data.values if hasattr(data, 'values') else data @classmethod def sample(cls, dataset, samples=None): if samples is None: samples = [] data = dataset.data mask = None for sample in samples: sample_mask = None if np.isscalar(sample): sample = [sample] for i, v in enumerate(sample): submask = data.iloc[:, i]==v if sample_mask is None: sample_mask = submask else: sample_mask &= submask if mask is None: mask = sample_mask else: mask |= sample_mask return data[mask] @classmethod def add_dimension(cls, dataset, dimension, dim_pos, values, vdim): data = dataset.data.copy() if dimension.name not in data: data.insert(dim_pos, dimension.name, values) return data @classmethod def assign(cls, dataset, new_data): return dataset.data.assign(**new_data)
[docs] @classmethod def as_dframe(cls, dataset): """ Returns the data of a Dataset as a dataframe avoiding copying if it already a dataframe type. """ if issubclass(dataset.interface, PandasInterface): return dataset.data else: return dataset.dframe()
@classmethod def dframe(cls, dataset, dimensions): if dimensions: return dataset.data[dimensions] else: return dataset.data.copy() @classmethod def iloc(cls, dataset, index): rows, cols = index scalar = False columns = list(dataset.data.columns) if isinstance(cols, slice): cols = [d.name for d in dataset.dimensions()][cols] elif np.isscalar(cols): scalar = np.isscalar(rows) cols = [dataset.get_dimension(cols).name] else: cols = [dataset.get_dimension(d).name for d in index[1]] cols = [columns.index(c) for c in cols] if np.isscalar(rows): rows = [rows] if scalar: return dataset.data.iloc[rows[0], cols[0]] return dataset.data.iloc[rows, cols]
Interface.register(PandasInterface)