How to effiiciently rebuild pandas hdfstore table

2019-05-05 07:46发布

I am working on using the hdfstore in pandas to data frames from an ongoing iterative process. At each iteration, I append to a table in the hdfstore. Here is a toy example:

import pandas as pd
from pandas import HDFStore
import numpy as np
from random import choice
from string import ascii_letters
alphanum=np.array(list(ascii_letters)+range(0,9))
def hdfstore_append(storefile,key,df,format="t",columns=None,data_columns=None):
    if df is None:
        return
    if key[0]!='/':
        key='/'+key
    with HDFStore(storefile) as store:
        if key not in store.keys():
            store.put(key,df,format=format,columns=columns,data_columns=data_columns)
        else:
            try:
                store.append(key,df)
            except Exception as inst:
                df = pd.concat([store.get(key),df])
                store.put(key,df,format=format,columns=columns,
                          data_columns=data_columns)

storefile="db.h5"
for i in range(0,100):
    df=pd.DataFrame([dict(n=np.random.randn(),
                       s=''.join(alphanum[np.random.randint(1,len(alphanum),np.random.randint(1,2*(i+1))]))],index=[i])
    hdfstore_append(storefile,'/SO/df',df,columns=df.columns,data_columns=True)

The hdfstore_append function guards against the various exceptions hdfstore.append throws, and rebuilds the table when necessary. The issue with this approach is that it gets very slow when the table in the store becomes very large.

Is there a more efficient way to do this?

1条回答
冷血范
2楼-- · 2019-05-05 08:14

Below is an example of an efficient method for building large pandas hdfstores. The key is to cache the frame numbers when the tables becomes large. Also instead of appending, removing pre-existing data will essentially create a put.

from __future__ import (absolute_import, division, print_function,
                        unicode_literals)
import six
import logging
import os
from abc import ABCMeta, abstractmethod, abstractproperty
import warnings

import pandas as pd

logger = logging.getLogger(__name__)


class FramewiseData(object):
    "Abstract base class defining a data container with framewise access."

    __metaclass__ = ABCMeta

    @abstractmethod
    def put(self, df):
        pass

    @abstractmethod
    def get(self, frame_no):
        pass

    @abstractproperty
    def frames(self):
        pass

    @abstractmethod
    def close(self):
        pass

    @abstractproperty
    def t_column(self):
        pass

    def __getitem__(self, frame_no):
        return self.get(frame_no)

    def __len__(self):
        return len(self.frames)

    def dump(self, N=None):
        """Return data from all, or the first N, frames in a single DataFrame
        Parameters
        ----------
        N : integer
            optional; if None, return all frames
        Returns
        -------
        DataFrame
        """
        if N is None:
            return pd.concat(iter(self))
        else:
            i = iter(self)
            return pd.concat((next(i) for _ in range(N)))

    @property
    def max_frame(self):
        return max(self.frames)

    def _validate(self, df):
        if self.t_column not in df.columns:
            raise ValueError("Cannot write frame without a column "
                             "called {0}".format(self.t_column))
        if df[self.t_column].nunique() != 1:
            raise ValueError("Found multiple values for 'frame'. "
                             "Write one frame at a time.")

    def __iter__(self):
        return self._build_generator()

    def _build_generator(self):
        for frame_no in self.frames:
            yield self.get(frame_no)

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        self.close()

KEY_PREFIX = 'Frame_'
len_key_prefix = len(KEY_PREFIX)


def code_key(frame_no):
    "Turn the frame_no into a 'natural name' string idiomatic of HDFStore"
    key = '{0}{1}'.format(KEY_PREFIX, frame_no)
    return key


def decode_key(key):
    frame_no = int(key[len_key_prefix:])
    return frame_no


class PandasHDFStore(FramewiseData):
    """An interface to an HDF5 file with framewise access, using pandas.
    Save each frame's data to a node in a pandas HDFStore.
    Any additional keyword arguments to the constructor are passed to
    pandas.HDFStore().
    """

    def __init__(self, filename, mode='a', t_column='frame', **kwargs):
        self.filename = os.path.abspath(filename)
        self._t_column = t_column
        self.store = pd.HDFStore(self.filename, mode, **kwargs)

    @property
    def t_column(self):
        return self._t_column

    @property
    def max_frame(self):
        return max(self.frames)

    def put(self, df):
        if len(df) == 0:
            warnings.warn('An empty DataFrame was passed to put(). Continuing.')
            return
        frame_no = df[self.t_column].values[0]  # validated to be all the same
        key = code_key(frame_no)
        # Store data as tabular instead of fixed-format.
        # Make sure remove any prexisting data, so don't really 'append'.
        try:
            self.store.remove(key)
        except KeyError:
            pass
        self.store.put(key, df, format='table')

    def get(self, frame_no):
        key = code_key(frame_no)
        frame = self.store.get(key)
        return frame

    @property
    def frames(self):
        """Returns sorted list of integer frame numbers in file"""
        return self._get_frame_nos()

    def _get_frame_nos(self):
        """Returns sorted list of integer frame numbers in file"""
        # Pandas' store.keys() scans the entire file looking for stored Pandas
        # structures. This is very slow for large numbers of frames.
        # Instead, scan the root level of the file for nodes with names
        # matching our scheme; we know they are DataFrames.
        r = [decode_key(key) for key in self.store.root._v_children.keys() if
             key.startswith(KEY_PREFIX)]
        r.sort()
        return r

    def close(self):
        self.store.close()


class PandasHDFStoreBig(PandasHDFStore):
    """Like PandasHDFStore, but keeps a cache of frame numbers.
    This can give a large performance boost when a file contains thousands
    of frames.
    If a file was made in PandasHDFStore, opening it with this class
    and then closing it will add a cache (if mode != 'r').
    Any additional keyword arguments to the constructor are passed to
    pandas.HDFStore().
    """

    def __init__(self, filename, mode='a', t_column='frame', **kwargs):
        self._CACHE_NAME = '_Frames_Cache'
        self._frames_cache = None
        self._cache_dirty = False  # Whether _frames_cache needs to be written out
        super(PandasHDFStoreBig, self).__init__(filename, mode, t_column,
                                                **kwargs)

    @property
    def frames(self):
        # Hit memory cache, then disk cache
        if self._frames_cache is not None:
            return self._frames_cache
        else:
            try:
                self._frames_cache = list(self.store[self._CACHE_NAME].index.values)
                self._cache_dirty = False
            except KeyError:
                self._frames_cache = self._get_frame_nos()
                self._cache_dirty = True # In memory, but not in file
            return self._frames_cache

    def put(self, df):
        self._invalidate_cache()
        super(PandasHDFStoreBig, self).put(df)

    def rebuild_cache(self):
        """Delete cache on disk and rebuild it."""
        self._invalidate_cache()
        _ = self.frames # Compute cache
        self._flush_cache()

    def _invalidate_cache(self):
        self._frames_cache = None
        try:
            del self.store[self._CACHE_NAME]
        except KeyError: pass

    def _flush_cache(self):
        """Writes frame cache if dirty and file is writable."""
        if (self._frames_cache is not None and self._cache_dirty
                and self.store.root._v_file._iswritable()):
            self.store[self._CACHE_NAME] = pd.DataFrame({'dummy': 1},
                                                        index=self._frames_cache)
            self._cache_dirty = False

    def close(self):
        """Updates cache, writes if necessary, then closes file."""
        if self.store.root._v_file._iswritable():
            _ = self.frames # Compute cache
            self._flush_cache()
        super(PandasHDFStoreBig, self).close()


class PandasHDFStoreSingleNode(FramewiseData):
    """An interface to an HDF5 file with framewise access,
    using pandas, that is faster for cross-frame queries.
    This implementation is more complex than PandasHDFStore,
    but it simplifies (speeds up?) cross-frame queries,
    like queries for a single probe's entire trajectory.
    Any additional keyword arguments to the constructor are passed to
    pandas.HDFStore().
    """

    def __init__(self, filename, key='FrameData', mode='a', t_column='frame',
                 use_tabular_copy=False, **kwargs):
        self.filename = os.path.abspath(filename)
        self.key = key
        self._t_column = t_column
        self.store = pd.HDFStore(self.filename, mode, **kwargs)

        with pd.get_store(self.filename) as store:
            try:
                store[self.key]
            except KeyError:
                pass
            else:
                self._validate_node(use_tabular_copy)

    @property
    def t_column(self):
        return self._t_column

    def put(self, df):
        if len(df) == 0:
            warnings.warn('An empty DataFrame was passed to put(). Continuing.')
            return
        self._validate(df)
        self.store.append(self.key, df, data_columns=True)

    def get(self, frame_no):
        frame = self.store.select(self.key, '{0} == {1}'.format(
            self._t_column, frame_no))
        return frame

    def dump(self, N=None):
        """Return data from all, or the first N, frames in a single DataFrame
        Parameters
        ----------
        N : integer
            optional; if None, return all frames
        Returns
        -------
        DataFrame
        """
        if N is None:
            return self.store.select(self.key)
        else:
            Nth_frame = self.frames[N - 1]
            return self.store.select(self.key, '{0} <= {1}'.format(
                self._t_column, Nth_frame))

    def close(self):
        self.store.close()

    def __del__(self):
        if hasattr(self, 'store'):
            self.close()

    @property
    def frames(self):
        """Returns sorted list of integer frame numbers in file"""
        # I assume one column can fit in memory, which is not ideal.
        # Chunking does not seem to be implemented for select_column.
        frame_nos = self.store.select_column(self.key, self.t_column).unique()
        frame_nos.sort()
        return frame_nos

    def _validate_node(self, use_tabular_copy):
        # The HDFStore might be non-tabular, which means we cannot select a
        # subset, and this whole structure will not work.
        # For convenience, this can rewrite the table into a tabular node.
        if use_tabular_copy:
            self.key = _make_tabular_copy(self.filename, self.key)

        pandas_type = getattr(getattr(getattr(
            self.store._handle.root, self.key, None), '_v_attrs', None),
            'pandas_type', None)
        if not pandas_type == 'frame_table':
            raise ValueError("This node is not tabular. Call with "
                             "use_tabular_copy=True to proceed.")


def _make_tabular_copy(store, key):
    """Copy the contents nontabular node in a pandas HDFStore
    into a tabular node"""
    tabular_key = key + '/tabular'
    logger.info("Making a tabular copy of %s at %s", (key, tabular_key))
    store.append(tabular_key, store.get(key), data_columns=True)
    return tabular_key
查看更多
登录 后发表回答