I want to process stock level-2 data in pandas. Suppose there are four kinds data in each row for simplicity:
- millis: timestamp, int64
- last_price: the last trade price, float64,
- ask_queue: the volume of ask side, a fixed size (200) array of int32
- bid_queue: the volume of bid side, a fixed size (200) array of int32
Which can be easily defined as a structured dtype in numpy:
dtype = np.dtype([
('millis', 'int64'),
('last_price', 'float64'),
('ask_queue', ('int32', 200)),
('bid_queue', ('int32', 200))
])
And in that way, I can access the ask_queue
and bid_queue
like:
In [17]: data = np.random.randint(0, 100, 1616 * 5).view(dtype)
% compute the average of ask_queue level 5 ~ 10
In [18]: data['ask_queue'][:, 5:10].mean(axis=1)
Out[18]:
array([33.2, 51. , 54.6, 53.4, 15. , 37.8, 29.6, 58.6, 32.2, 51.6, 34.4,
43.2, 58.4, 26.8, 54. , 59.4, 58.8, 38.8, 35.2, 71.2])
My question is how to define a DataFrame
include the data?
There are two solutions here:
A. set the ask_queue
and bid_queue
as two columns with array values as following:
In [5]: df = pd.DataFrame(data.tolist(), columns=data.dtype.names)
In [6]: df.dtypes
Out[6]:
millis int64
last_price float64
ask_queue object
bid_queue object
dtype: object
However, there at least two problems in this solution:
- The
ask_queue
and bid_queue
lost the dtype of 2D array and all
the convenient methods;
- Performance, since it become a array of objects rather than a 2D
array.
B. flatten the ask_queue
and bid_quene
to 2 * 200
columns:
In [8]: ntype = np.dtype([('millis', 'int64'), ('last_price', 'float64')] +
...: [(f'{name}{i}', 'int32') for name in ['ask', 'bid'] for i in range(200)])
In [9]: df = pd.DataFrame.from_records(data.view(ntype))
In [10]: df.dtypes
Out[10]:
millis int64
last_price float64
ask0 int32
ask1 int32
ask2 int32
ask3 int32
ask4 int32
ask5 int32
...
It's better than solution A. But the 2 * 200 columns looks redundant.
Is there any solution can take the advantage as the structured dtype in numpy?
I wonder if the ExtensionArray
or `ExtensionDtype' can solve this.
Q : Is there any solution can take the advantage as the structured dtype
in numpy
?
Working with L2-DoM data has two-fold complications, compared to the just ToB ( Top-of-the-Book ) price-feed data. a) the native feed is fast ( very fast / FIX Protocol or other private data-feeds deliver records with hundreds, thousands ( more during fundamental events on majors ) L2-DoM changes per millisecond. Both processing and storage must be performance-oriented b) any kind of offline analyses has to successfully manipulate and efficiently process large data-sets, due to the nature of item a)
- Storage preferences
- Using
numpy
-alike syntax preferences
- Performance preferences
Storage preferences : SOLVED
Given pandas.DataFrame
was set as the preferred storage type, let's respect that, even though syntax and performance preferences may take adverse impacts.
Going other way is possible, yet may introduce unknown re-factoring / re-engineering costs, that the O/P's operational environment need not or already is not willing to bear.
Having said this, pandas
feature limitations have to be put into the design considerations and all the other steps will have to live with it, unless this preference might get revised in some future time.
numpy
-alike syntax : SOLVED
This request is sound and clear, as numpy
tools are fast and smart crafted for high-performance number-crunching. Given the set storage preference, we will implement a pair of numpy
-tricks so as to fit into pandas
2D-DataFrame
all at reasonable costs on both the .STORE
and .RETRIEVE
directions:
# on .STORE:
testDF['ask_DoM'][aRowIDX] = ask200.dumps() # type(ask200) <class 'numpy.ndarray'>
# on .RETRIEVE:
L2_ASK = np.loads( testDF['ask_DoM'][aRowIDX] ) # type(L2_ASK) <class 'numpy.ndarray'>
Performance preferences : TESTED
Net add-on costs of the proposed solution for both .STORE
and .RETRIEVE
directions were tested to take:
A one-time cost on .STORE
direction of no less than 70 [us]
and no more than ~ 160 [us]
per cell for given scales of L2_DoM arrays ( avg: 78 [ms]
StDev: 9-11 [ms]
):
>>> [ f( [testDUMPs() for _ in range(1000)] ) for f in (np.min,np.mean,np.std,np.max) ]
[72, 79.284, 11.004153942943548, 150]
[72, 78.048, 10.546135548152224, 160]
[71, 78.584, 9.887971227708949, 139]
[72, 76.9, 8.827332496286745, 132]
A repeating cost on .RETRIEVE
direction of no less than 46 [us]
and no more than ~ 123 [us]
per cell for given scales of L2_DoM arrays ( avg: 50 [us]
StDev: 9.5 [us]
):
>>> [ f( [testLOADs() for _ in range(1000)] ) for f in (np.min,np.mean,np.std,np.max) ]
[46, 50.337, 9.655194197943405, 104]
[46, 49.649, 9.462272665697178, 123]
[46, 49.513, 9.504293766503643, 123]
[46, 49.77, 8.367165350344164, 114]
[46, 51.355, 6.162434583831296, 89]
Even higher performance is to be expected if using better architecture-aligned int64
datatypes ( yes, at a cost of doubled costs of storage, yet the costs of computations will decide if this move has a performance edge ) and from a chance to use memoryview
-based manipulations, that can cut the throat down and shave the add-on latency to about 22 [us]
.
Test were run under py3.5.6, numpy v1.15.2, using:
>>> import numpy as np; ask200 = np.arange( 200, dtype = np.int32 ); s = ask200.dumps()
>>> from zmq import Stopwatch; aClk = Stopwatch()
>>> def testDUMPs():
... aClk.start()
... s = ask200.dumps()
... return aClk.stop()
...
>>> def testLOADs():
... aClk.start()
... a = np.loads( s )
... return aClk.stop()
...
Platform CPU, cache hierarchy and RAM details:
>>> get_numexpr_cpuinfo_details_on_CPU()
'TLB size'______________________________:'1536 4K pages'
'address sizes'_________________________:'48 bits physical, 48 bits virtual'
'apicid'________________________________:'17'
'bogomips'______________________________:'7199.92'
'bugs'__________________________________:'fxsave_leak sysret_ss_attrs null_seg spectre_v1 spectre_v2'
'cache size'____________________________:'2048 KB'
'cache_alignment'_______________________:'64'
'clflush size'__________________________:'64'
'core id'_______________________________:'1'
'cpu MHz'_______________________________:'1400.000'
'cpu cores'_____________________________:'2'
'cpu family'____________________________:'21'
'cpuid level'___________________________:'13'
'flags'_________________________________:'fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt pdpe1gb rdtscp lm constant_tsc rep_good nopl nonstop_tsc extd_apicid aperfmperf eagerfpu pni pclmulqdq monitor ssse3 cx16 sse4_1 sse4_2 popcnt aes xsave avx lahf_lm cmp_legacy svm extapic cr8_legacy abm sse4a misalignsse 3dnowprefetch osvw ibs xop skinit wdt lwp fma4 nodeid_msr topoext perfctr_core perfctr_nb cpb hw_pstate vmmcall arat npt lbrv svm_lock nrip_save tsc_scale vmcb_clean flushbyasid decodeassists pausefilter pfthreshold'
'fpu'___________________________________:'yes'
'fpu_exception'_________________________:'yes'
'initial apicid'________________________:'1'
'microcode'_____________________________:'0x6000626'
'model'_________________________________:'1'
'model name'____________________________:'AMD FX(tm)-4100 Quad-Core Processor'
'physical id'___________________________:'0'
'power management'______________________:'ts ttp tm 100mhzsteps hwpstate cpb'
'processor'_____________________________:'1'
'siblings'______________________________:'4'
'stepping'______________________________:'2'
'vendor_id'_____________________________:'AuthenticAMD'
'wp'____________________________________:'yes'
Pandas has been designed to handle and process two-dimensional data (the kind you would put in a spreadsheet). Because "ask_queue" and "bid_queue" are not mono-dimensional series but two-dimensional arrays, you cannot (easily) push them into a Pandas dataframe.
In such cases, you have to use other libraries such as xarray: http://xarray.pydata.org/
import xarray as xr
# Creating variables, first argument is the name of the dimensions
last_price = xr.Variable("millis", data["last_price"])
ask_queue = xr.Variable(("millis", "levels"), data["ask_queue"])
bid_queue = xr.Variable(("millis", "levels"), data["bid_queue"])
# Putting the variables in a dataset, the multidimensional equivalent of a Pandas
# dataframe
ds = xr.Dataset({"last_price": last_price, "ask_queue": ask_queue,
"bid_queue": bid_queue}, coords={"millis": data["millis"]})
# Computing the average of ask_queue level 5~10
ds["ask_queue"][{"levels": slice(5,10)}].mean(axis=1)