Is there any elegant way to define a dataframe wit

2020-06-09 11:00发布

I want to process stock level-2 data in pandas. Suppose there are four kinds data in each row for simplicity:

  • millis: timestamp, int64
  • last_price: the last trade price, float64,
  • ask_queue: the volume of ask side, a fixed size (200) array of int32
  • bid_queue: the volume of bid side, a fixed size (200) array of int32

Which can be easily defined as a structured dtype in numpy:

dtype = np.dtype([
   ('millis', 'int64'), 
   ('last_price', 'float64'), 
   ('ask_queue', ('int32', 200)), 
   ('bid_queue', ('int32', 200))
])

And in that way, I can access the ask_queue and bid_queue like:

In [17]: data = np.random.randint(0, 100, 1616 * 5).view(dtype)

% compute the average of ask_queue level 5 ~ 10
In [18]: data['ask_queue'][:, 5:10].mean(axis=1)  
Out[18]: 
array([33.2, 51. , 54.6, 53.4, 15. , 37.8, 29.6, 58.6, 32.2, 51.6, 34.4,
       43.2, 58.4, 26.8, 54. , 59.4, 58.8, 38.8, 35.2, 71.2])

My question is how to define a DataFrame include the data?

There are two solutions here:

A. set the ask_queue and bid_queue as two columns with array values as following:

In [5]: df = pd.DataFrame(data.tolist(), columns=data.dtype.names)

In [6]: df.dtypes
Out[6]: 
millis          int64
last_price    float64
ask_queue      object
bid_queue      object
dtype: object

However, there at least two problems in this solution:

  1. The ask_queue and bid_queue lost the dtype of 2D array and all the convenient methods;
  2. Performance, since it become a array of objects rather than a 2D array.

B. flatten the ask_queue and bid_quene to 2 * 200 columns:

In [8]: ntype = np.dtype([('millis', 'int64'), ('last_price', 'float64')] + 
   ...:                  [(f'{name}{i}', 'int32') for name in ['ask', 'bid'] for i in range(200)])

In [9]: df = pd.DataFrame.from_records(data.view(ntype))

In [10]: df.dtypes
Out[10]: 
millis          int64
last_price    float64
ask0            int32
ask1            int32
ask2            int32
ask3            int32
ask4            int32
ask5            int32
...

It's better than solution A. But the 2 * 200 columns looks redundant.

Is there any solution can take the advantage as the structured dtype in numpy? I wonder if the ExtensionArray or `ExtensionDtype' can solve this.

2条回答
孤傲高冷的网名
2楼-- · 2020-06-09 11:29

Pandas has been designed to handle and process two-dimensional data (the kind you would put in a spreadsheet). Because "ask_queue" and "bid_queue" are not mono-dimensional series but two-dimensional arrays, you cannot (easily) push them into a Pandas dataframe.

In such cases, you have to use other libraries such as xarray: http://xarray.pydata.org/

import xarray as xr

# Creating variables, first argument is the name of the dimensions
last_price = xr.Variable("millis", data["last_price"])
ask_queue = xr.Variable(("millis", "levels"), data["ask_queue"])
bid_queue = xr.Variable(("millis", "levels"), data["bid_queue"])

# Putting the variables in a dataset, the multidimensional equivalent of a Pandas
# dataframe
ds = xr.Dataset({"last_price": last_price, "ask_queue": ask_queue,
                 "bid_queue": bid_queue}, coords={"millis": data["millis"]})

# Computing the average of ask_queue level 5~10
ds["ask_queue"][{"levels": slice(5,10)}].mean(axis=1)
查看更多
唯我独甜
3楼-- · 2020-06-09 11:43

Q : Is there any solution can take the advantage as the structured dtype in numpy?

Working with L2-DoM data has two-fold complications, compared to the just ToB ( Top-of-the-Book ) price-feed data. a) the native feed is fast ( very fast / FIX Protocol or other private data-feeds deliver records with hundreds, thousands ( more during fundamental events on majors ) L2-DoM changes per millisecond. Both processing and storage must be performance-oriented b) any kind of offline analyses has to successfully manipulate and efficiently process large data-sets, due to the nature of item a)

  • Storage preferences
  • Using numpy-alike syntax preferences
  • Performance preferences

Storage preferences : SOLVED

Given pandas.DataFrame was set as the preferred storage type, let's respect that, even though syntax and performance preferences may take adverse impacts.

Going other way is possible, yet may introduce unknown re-factoring / re-engineering costs, that the O/P's operational environment need not or already is not willing to bear.

Having said this, pandas feature limitations have to be put into the design considerations and all the other steps will have to live with it, unless this preference might get revised in some future time.


numpy-alike syntax : SOLVED

This request is sound and clear, as numpy tools are fast and smart crafted for high-performance number-crunching. Given the set storage preference, we will implement a pair of numpy-tricks so as to fit into pandas 2D-DataFrame all at reasonable costs on both the .STORE and .RETRIEVE directions:

 # on .STORE:
 testDF['ask_DoM'][aRowIDX] = ask200.dumps()      # type(ask200) <class 'numpy.ndarray'>

 # on .RETRIEVE:
 L2_ASK = np.loads( testDF['ask_DoM'][aRowIDX] )  # type(L2_ASK) <class 'numpy.ndarray'>

Performance preferences : TESTED

Net add-on costs of the proposed solution for both .STORE and .RETRIEVE directions were tested to take:

A one-time cost on .STORE direction of no less than 70 [us] and no more than ~ 160 [us] per cell for given scales of L2_DoM arrays ( avg: 78 [ms] StDev: 9-11 [ms] ):

>>> [ f( [testDUMPs() for _ in range(1000)] ) for f in (np.min,np.mean,np.std,np.max) ]
[72, 79.284, 11.004153942943548, 150]
[72, 78.048, 10.546135548152224, 160]
[71, 78.584,  9.887971227708949, 139]
[72, 76.9,    8.827332496286745, 132]

A repeating cost on .RETRIEVE direction of no less than 46 [us] and no more than ~ 123 [us] per cell for given scales of L2_DoM arrays ( avg: 50 [us] StDev: 9.5 [us] ):

>>> [ f( [testLOADs() for _ in range(1000)] ) for f in (np.min,np.mean,np.std,np.max) ]
[46, 50.337, 9.655194197943405, 104]
[46, 49.649, 9.462272665697178, 123]
[46, 49.513, 9.504293766503643, 123]
[46, 49.77,  8.367165350344164, 114]
[46, 51.355, 6.162434583831296,  89]

Even higher performance is to be expected if using better architecture-aligned int64 datatypes ( yes, at a cost of doubled costs of storage, yet the costs of computations will decide if this move has a performance edge ) and from a chance to use memoryview-based manipulations, that can cut the throat down and shave the add-on latency to about 22 [us].


Test were run under py3.5.6, numpy v1.15.2, using:

>>> import numpy as np; ask200 = np.arange( 200, dtype = np.int32 ); s = ask200.dumps()
>>> from zmq import Stopwatch; aClk = Stopwatch()
>>> def testDUMPs():
...     aClk.start()
...     s = ask200.dumps()
...     return aClk.stop()
... 
>>> def testLOADs():
...     aClk.start()
...     a = np.loads( s )
...     return aClk.stop()
...

Platform CPU, cache hierarchy and RAM details:

>>> get_numexpr_cpuinfo_details_on_CPU()

'TLB size'______________________________:'1536 4K pages'
'address sizes'_________________________:'48 bits physical, 48 bits virtual'
'apicid'________________________________:'17'
'bogomips'______________________________:'7199.92'
'bugs'__________________________________:'fxsave_leak sysret_ss_attrs null_seg spectre_v1 spectre_v2'
'cache size'____________________________:'2048 KB'
'cache_alignment'_______________________:'64'
'clflush size'__________________________:'64'
'core id'_______________________________:'1'
'cpu MHz'_______________________________:'1400.000'
'cpu cores'_____________________________:'2'
'cpu family'____________________________:'21'
'cpuid level'___________________________:'13'
'flags'_________________________________:'fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt pdpe1gb rdtscp lm constant_tsc rep_good nopl nonstop_tsc extd_apicid aperfmperf eagerfpu pni pclmulqdq monitor ssse3 cx16 sse4_1 sse4_2 popcnt aes xsave avx lahf_lm cmp_legacy svm extapic cr8_legacy abm sse4a misalignsse 3dnowprefetch osvw ibs xop skinit wdt lwp fma4 nodeid_msr topoext perfctr_core perfctr_nb cpb hw_pstate vmmcall arat npt lbrv svm_lock nrip_save tsc_scale vmcb_clean flushbyasid decodeassists pausefilter pfthreshold'
'fpu'___________________________________:'yes'
'fpu_exception'_________________________:'yes'
'initial apicid'________________________:'1'
'microcode'_____________________________:'0x6000626'
'model'_________________________________:'1'
'model name'____________________________:'AMD FX(tm)-4100 Quad-Core Processor'
'physical id'___________________________:'0'
'power management'______________________:'ts ttp tm 100mhzsteps hwpstate cpb'
'processor'_____________________________:'1'
'siblings'______________________________:'4'
'stepping'______________________________:'2'
'vendor_id'_____________________________:'AuthenticAMD'
'wp'____________________________________:'yes'
查看更多
登录 后发表回答