Improve Query Performance From a Large HDFStore Ta

2019-03-11 17:13发布

问题:

I have a large (~160 million rows) dataframe that I've stored to disk with something like this:

    def fillStore(store, tablename):
        files = glob.glob('201312*.csv')
        names = ["ts", "c_id", "f_id","resp_id","resp_len", "s_id"]
        for f in files:
            df = pd.read_csv(f, parse_dates=True, index_col=0, names=names)
            store.append(tablename, df, format='table', data_columns=['c_id','f_id'])

The table has a time index and I will query using c_id and f_id in addition to times (via the index).

I have another dataframe containing ~18000 "incidents." Each incident consists of some (as few as hundreds, as many as hundreds of thousands) individual records. I need to collect some simple statistics for each incident and store them in order to collect some aggregate statistics. Currently I do this like so:

def makeQueryString(c, f, start, stop):
    return "c_id == {} & f_id == {} & index >= Timestamp('{}') & index < Timestamp('{}')".format(c, f , str(pd.to_datetime(start)),str(pd.to_datetime(stop)))

def getIncidents(inc_times, store, tablename):
    incidents = pd.DataFrame(columns = ['c_id','f_id','resp_id','resp_len','s_id','incident_id'])
    for ind, row in inc_times.iterrows():
        incidents = incidents.append(store.select(tablename, 
                                                  makeQueryString(row.c_id, 
                                                                  row.f_id, 
                                                                  row.start, 
                                                                  row.stop))).fillna(ind)
    return incidents

This all works fine except for the fact that each store.select() statement takes roughly 5 seconds which means that processing the full month's worth of data requires somewhere between 24-30 hours of processing. Meanwhile, the actual statistics I need are relatively simple:

def getIncidentStats(df):
    incLen = (df.index[-1]-df.index[0]).total_seconds()
    if incLen == 0:
        incLen = .1
    rqsts = len(df)
    rqstRate_s = rqsts/incLen
    return pd.Series({'c_id':df.c_id[0],
                      'f_id':df.fqdn_id[0],
                      'Length_sec':incLen, 
                      'num_rqsts':rqsts, 
                      'rqst_rate':rqstRate_s, 
                      'avg_resp_size':df.response_len.mean(), 
                      'std_resp_size':df.response_len.std()})


incs = getIncidents(i_times, store, tablename)
inc_groups = incs.groupby('incident_id')
inc_stats = inc_groups.apply(getIncidentStats)

My question is: how can I improve the performance or efficiency of any part of this work flow? (Please note that I actually batch most of the jobs to get and store incidents one day at a time simply because I want to limit the risk of losing already processed data in the even of a crash. I left this code out here for simplicity and because I actually need to process the whole month's data.)

Is there a way to process the data as I receive it from the store and is there any benefit to this? Would I benefit from using store.select_as_index? If I receive an index I'd still need to access the data to get the statistics correct?

Other notes/questions: I have compared the performance of storing my HDFStore on both a SSD and normal hard drive and didn't notice any improvement for the SSD. Is this expected?

I also toyed with the idea of creating a large conjunction of query strings and asking for them all at once. This causes memory errors when the total query string is too large (~5-10 queries).

Edit 1 If it matters, I am using tables version 3.1.0 and pandas version 0.13.1

Edit 2 Here is some more information:

ptdump -av store.h5
/ (RootGroup) ''
  /._v_attrs (AttributeSet), 4 attributes:
   [CLASS := 'GROUP',
    PYTABLES_FORMAT_VERSION := '2.0',
    TITLE := '',
    VERSION := '1.0']
/all_recs (Group) ''
  /all_recs._v_attrs (AttributeSet), 14 attributes:
   [CLASS := 'GROUP',
    TITLE := '',
    VERSION := '1.0',
    data_columns := ['c_id', 'f_id'],
    encoding := None,
    index_cols := [(0, 'index')],
    info := {1: {'type': 'Index', 'names': [None]}, 'index': {'index_name': 'ts'}},
    levels := 1,
    nan_rep := 'nan',
    non_index_axes := [(1, ['c_id', 'f_id', 'resp_id', 'resp_len', 'dns_server_id'])],
    pandas_type := 'frame_table',
    pandas_version := '0.10.1',
    table_type := 'appendable_frame',
    values_cols := ['values_block_0', 'c_id', 'f_id']]
/all_recs/table (Table(161738653,)) ''
  description := {
  "index": Int64Col(shape=(), dflt=0, pos=0),
  "values_block_0": Int64Col(shape=(3,), dflt=0, pos=1),
  "c_id": Int64Col(shape=(), dflt=0, pos=2),
  "f_id": Int64Col(shape=(), dflt=0, pos=3)}
  byteorder := 'little'
  chunkshape := (5461,)
  autoindex := True
  colindexes := {
    "index": Index(6, medium, shuffle, zlib(1)).is_csi=False,
    "f_id": Index(6, medium, shuffle, zlib(1)).is_csi=False,
    "c_id": Index(6, medium, shuffle, zlib(1)).is_csi=False}
  /all_recs/table._v_attrs (AttributeSet), 19 attributes:
   [CLASS := 'TABLE',
    FIELD_0_FILL := 0,
    FIELD_0_NAME := 'index',
    FIELD_1_FILL := 0,
    FIELD_1_NAME := 'values_block_0',
    FIELD_2_FILL := 0,
    FIELD_2_NAME := 'c_id',
    FIELD_3_FILL := 0,
    FIELD_3_NAME := 'f_id',
    NROWS := 161738653,
    TITLE := '',
    VERSION := '2.6',
    client_id_dtype := 'int64',
    client_id_kind := ['c_id'],
    fqdn_id_dtype := 'int64',
    fqdn_id_kind := ['f_id'],
    index_kind := 'datetime64',
    values_block_0_dtype := 'int64',
    values_block_0_kind := ['s_id', 'resp_len', 'resp_id']]

Here are samples of both the main table and inc_times:

In [12]: df.head()
Out[12]: 
                          c_id        f_id          resp_id      resp_len  \
ts                                                                   
2013-12-04 08:00:00  637092486  5372764353               30      56767543   
2013-12-04 08:00:01  637092486  5399580619               23      61605423   
2013-12-04 08:00:04    5456242  5385485460               21      46742687   
2013-12-04 08:00:04    5456242  5385485460               21      49909681   
2013-12-04 08:00:04  624791800  5373236646               14      70461449   

                              s_id  
ts                           
2013-12-04 08:00:00           1829  
2013-12-04 08:00:01           1724  
2013-12-04 08:00:04           1679  
2013-12-04 08:00:04           1874  
2013-12-04 08:00:04           1727  

[5 rows x 5 columns]


In [13]: inc_times.head()
Out[13]: 
        c_id     f_id                start                 stop
0       7254   196211  1385880945000000000  1385880960000000000
1       9286   196211  1387259840000000000  1387259850000000000
2      16032   196211  1387743730000000000  1387743735000000000
3      19793   196211  1386208175000000000  1386208200000000000
4      19793   196211  1386211800000000000  1386211810000000000

[5 rows x 4 columns]

Regarding c_id and f_id, the set of IDs I want to select from the full store is relatively few compared to the total number of IDs in the store. In other words, there are some popular IDs in inc_times that I will repeatedly query while completely ignoring some of the IDs that exist in the full table. I'd estimate that the Ids I care about are roughly 10% of the total IDs, but that these are the most popular IDs so their records dominate the full set.

I have 16GB RAM. The full store is 7.4G and the full dataset (as a csv file) is only 8.7 GB. Initially I believed I would be able to load the whole thing in memory and at least do some limited operations on it, but I get memory errors on loading the whole thing. Hence, batching it into daily files (the full file consists of data for one month).

回答1:

Here's some recommendations and a similar question is here

Use compression: see here. You should try this (this could make it faster / slower depending on exactly what you are querying), YMMV.

ptrepack --chunkshape=auto --propindexes --complevel=9 --complib=blosc in.h5 out.h5

Use a hierarchical query in chunks. What I mean is this. Since you have a relatively small number of c_id and f_id that you care about, structure a single query something like this. This is kind of like using isin.

f_ids = list_of_f_ids that I care about
c_ids = list_of_c_ids that I care about

def create_batches(l, maxn=32):
    """ create a list of batches, maxed at maxn """
    batches = []
    while(True):
        if len(l) <= maxn:
            if len(l) > 0:
                batches.append(l)
            break
        batches.append(l[0:maxn])
        l = l[maxn:]
    return batches


results = []
for f_id_batch in create_batches(f_id_list):

    for c_id_batch in create_batches(c_id_list):

        q = "f_id={f_id} & c_id={c_id}".format(
                f_id=f_id_batch,
                c_id=c_id_batch)

        # you can include the max/min times in here as well (they would be max/min
        # time for ALL the included batches though, maybe easy for you to compute

        result = store.select('df',where=q)

        # sub process this result

        def f(x):
            # you will need to filter out the min/max timestamps here (which I gather
            # are somewhat dependent on f_id/c_id group

            #### process the data and return something
            # you could do something like: ``return x.describe()`` for simple stats

         results.append(result.groupby(['f_id','c_id').apply(f))

results = pd.concat(results)

The key here is to process so that the isin DOES not have more that 32 members for any variable that you are querying on. This is an internal numpy/pytables limitation. If you exceed this, the query will work, but it will drop that variable and do a reindex on ALL the data (which is NOT what you want here).

This way you will have a nice subset of data in memory over just a few loops. These queries I think would take about the same time as most of your queries or so, but you will have way fewer.

The query time is roughly constant for a given subset (unless the data is ordered such that it it is completely indexed).

So the query scans 'blocks' of data (which is what the indexes point to). If you have lots of hits across many blocks then the query is slower.

Here's an example

In [5]: N = 100000000

In [6]: df = DataFrame(np.random.randn(N,3),columns=['A','B','C'])

In [7]: df['c_id'] = np.random.randint(0,10,size=N)

In [8]: df['f_id'] = np.random.randint(0,10,size=N)

In [9]: df.index = date_range('20130101',periods=N,freq='s')

In [10]: df.to_hdf('test2.h5','df',mode='w',data_columns=['c_id','f_id'])

In [11]: df.head()
Out[11]: 
                            A         B         C  c_id  f_id
2013-01-01 00:00:00  0.037287  1.153534  0.639669     8     7
2013-01-01 00:00:01  1.741046  0.459821  0.194282     8     3
2013-01-01 00:00:02 -2.273919 -0.141789  0.770567     1     1
2013-01-01 00:00:03  0.320879 -0.108426 -1.310302     8     6
2013-01-01 00:00:04 -1.445810 -0.777090 -0.148362     5     5
2013-01-01 00:00:05  1.608211  0.069196  0.025021     3     6
2013-01-01 00:00:06 -0.561690  0.613579  1.071438     8     2
2013-01-01 00:00:07  1.795043 -0.661966  1.210714     0     0
2013-01-01 00:00:08  0.176347 -0.461176  1.624514     3     6
2013-01-01 00:00:09 -1.084537  1.941610 -1.423559     9     1
2013-01-01 00:00:10 -0.101036  0.925010 -0.809951     0     9
2013-01-01 00:00:11 -1.185520  0.968519  2.871983     7     5
2013-01-01 00:00:12 -1.089267 -0.333969 -0.665014     3     6
2013-01-01 00:00:13  0.544427  0.130439  0.423749     5     7
2013-01-01 00:00:14  0.112216  0.404801 -0.061730     5     4
2013-01-01 00:00:15 -1.349838 -0.639435  0.993495     0     9


In [2]: %timeit pd.read_hdf('test2.h5','df',where="f_id=[1] & c_id=[2]")
1 loops, best of 3: 13.9 s per loop

In [3]: %timeit pd.read_hdf('test2.h5','df',where="f_id=[1,2] & c_id=[1,2]")
1 loops, best of 3: 21.2 s per loop

In [4]: %timeit pd.read_hdf('test.2h5','df',where="f_id=[1,2,3] & c_id=[1,2,3]")
1 loops, best of 3: 42.8 s per loop

This particular example is 5GB uncompressed and 2.9GB compressed. These results are on the compressed data. In THIS case it is actually quite a bit faster to use the uncompressed (e.g. the first loop taked 3.5s). This is 100MM rows.

So using the last example (4) you are getting 9x the data of the first in a little over 3x the query time.

However your speedup should be MUCH more, because you won't be selecting on individual timestamps, rather doing that later.

This whole approach takes into account that you have enough main memory to hold your results in the batch sizes (e.g. you are selecting a relatively small part of the set in the batch queries).