是否有一种优雅的方式定义一个带有数组类型列的数据框?

13

我希望在 pandas 中处理股票 level-2 数据。假设每行数据简单地包含四种类型:

  • millis:时间戳,int64 类型
  • last_price:最近的交易价格,float64 类型
  • ask_queue:卖方量,大小为 200 的 int32 类型数组
  • bid_queue:买方量,大小为 200 的 int32 类型数组

它们可以很容易地在 numpy 中定义为结构化 dtype。

dtype = np.dtype([
   ('millis', 'int64'), 
   ('last_price', 'float64'), 
   ('ask_queue', ('int32', 200)), 
   ('bid_queue', ('int32', 200))
])

这样,我就可以像这样访问ask_queuebid_queue:


In [17]: data = np.random.randint(0, 100, 1616 * 5).view(dtype)

% compute the average of ask_queue level 5 ~ 10
In [18]: data['ask_queue'][:, 5:10].mean(axis=1)  
Out[18]: 
array([33.2, 51. , 54.6, 53.4, 15. , 37.8, 29.6, 58.6, 32.2, 51.6, 34.4,
       43.2, 58.4, 26.8, 54. , 59.4, 58.8, 38.8, 35.2, 71.2])

我的问题是如何定义一个包含数据的 DataFrame

这里有两个解决方案:

A. 将 ask_queuebid_queue 设置为两列,其值为数组,如下所示:

In [5]: df = pd.DataFrame(data.tolist(), columns=data.dtype.names)

In [6]: df.dtypes
Out[6]: 
millis          int64
last_price    float64
ask_queue      object
bid_queue      object
dtype: object

然而,这种解决方案存在至少两个问题:

  1. ask_queuebid_queue 丢失了2D数组的数据类型和所有便捷方法;
  2. 性能问题,因为它变成了对象数组而不是2D数组。

B. 将 ask_queuebid_quene 扁平化为 2 * 200 列:

In [8]: ntype = np.dtype([('millis', 'int64'), ('last_price', 'float64')] + 
   ...:                  [(f'{name}{i}', 'int32') for name in ['ask', 'bid'] for i in range(200)])

In [9]: df = pd.DataFrame.from_records(data.view(ntype))

In [10]: df.dtypes
Out[10]: 
millis          int64
last_price    float64
ask0            int32
ask1            int32
ask2            int32
ask3            int32
ask4            int32
ask5            int32
...

这比方案A要好。但是2 * 200列看起来是多余的。

是否有解决方案可以像numpy中的结构化dtype一样发挥优势?我想知道ExtensionArray或`ExtensionDtype'是否可以解决这个问题。


7
pandas 不是用来存储对象的。事物应该本质上组织成二维数组(毕竟它是为PANel DAta设计的)。使用对象类型时,基本上会失去所有有用的功能。第二个选项是最好的。您可以使用 df.loc[:, 'ask5':'ask9'].mean(1) 计算完全相同的均值,这与我认为的numpy功能一样简单。 - ALollz
@Eastsun:我刚刚了解了ExtensionDtype。听起来你可以用它来实现你的目的,但我认为你应该仔细检查一下是否也能够实现你需要的操作,例如对数组片段进行平均值计算。如果不可能,你总是需要使用像map这样的方法,并且可能需要将结构复制到另一个numpy表示中才能执行它。这可能会使它变得非常慢。另一方面,如果你在ExtensionDtype中实现你的API,由于它是实验性的,所以实现可能会在pandas的新版本中被破坏。 - jottbe
@jottbe,我刚刚发现并阅读了这篇博客:https://tomaugspurger.github.io/pandas-extension-arrays.html,这篇博客中提到的方法似乎也可以解决我的问题。稍后我会深入研究它。 - Eastsun
也许这个答案能帮到你。祝好! - cincin21
2个回答

6

问:是否有任何解决方案可以利用 numpy 中的结构化 dtype ?

与仅使用ToB(最佳价格)数据相比,使用L2-DoM数据有两个复杂性。 a)原始数据传输速度很快(非常快/ FIX协议或其他私有数据传输记录每毫秒更改数百,数千个(在主要基本事件期间为更多)。处理和存储必须针对性能进行优化。b)任何类型的离线分析都必须成功地操纵并高效地处理大型数据集,由于第a项的特性

  • 存储偏好
  • 使用类似于numpy的语法偏好
  • 性能偏好

存储偏好:已解决

如果将pandas.DataFrame设置为首选存储类型,则尊重它,即使语法和性能偏好可能会产生不利影响。

另一种方法是可行的,但可能会引入未知的重构/重新设计成本,而 O / P 的操作环境无需或已不愿承担这些成本。

话虽这么说,必须将pandas功能限制纳入设计考虑,并且除非在将来可能修改此首选项,否则所有其他步骤都必须与其共存。


Numpy-类似语法:已解决

这个请求清晰明了,因为numpy工具被智能地制作成高性能数字处理。鉴于设置的存储偏好,我们将实现一对numpy技巧,以适合pandas 2D-DataFrame,并且在.STORE.RETRIEVE方向上都具有合理的成本:

 # on .STORE:
 testDF['ask_DoM'][aRowIDX] = ask200.dumps()      # type(ask200) <class 'numpy.ndarray'>

 # on .RETRIEVE:
 L2_ASK = np.loads( testDF['ask_DoM'][aRowIDX] )  # type(L2_ASK) <class 'numpy.ndarray'>

性能偏好:已测试

对于 .STORE.RETRIEVE 方向的提议解决方案的网络附加成本进行了测试,结果为:

.STORE 方向上的一次性成本 对于给定 L2_DoM 数组规模的每个单元格不低于70 [us],不超过~160 [us](平均值:78 [ms],标准差:9-11 [ms]):

>>> [ f( [testDUMPs() for _ in range(1000)] ) for f in (np.min,np.mean,np.std,np.max) ]
[72, 79.284, 11.004153942943548, 150]
[72, 78.048, 10.546135548152224, 160]
[71, 78.584,  9.887971227708949, 139]
[72, 76.9,    8.827332496286745, 132]

在给定的L2_DoM数组规模下,每个单元格在.RETRIEVE方向上的重复成本不少于46 [us],不超过~ 123 [us](平均值:50 [us],标准差:9.5 [us])。
>>> [ f( [testLOADs() for _ in range(1000)] ) for f in (np.min,np.mean,np.std,np.max) ]
[46, 50.337, 9.655194197943405, 104]
[46, 49.649, 9.462272665697178, 123]
[46, 49.513, 9.504293766503643, 123]
[46, 49.77,  8.367165350344164, 114]
[46, 51.355, 6.162434583831296,  89]

预计如果使用更好的与架构对齐的 int64 数据类型(是的,成本翻倍,但计算成本将决定此举是否具有性能优势),则可以期望更高的性能,并有机会使用基于memoryview的操作,它们可以大大减少附加延迟并将其削减到约 22 [us]


测试在py3.5.6下运行,numpy v1.15.2,在以下条件下进行:

>>> import numpy as np; ask200 = np.arange( 200, dtype = np.int32 ); s = ask200.dumps()
>>> from zmq import Stopwatch; aClk = Stopwatch()
>>> def testDUMPs():
...     aClk.start()
...     s = ask200.dumps()
...     return aClk.stop()
... 
>>> def testLOADs():
...     aClk.start()
...     a = np.loads( s )
...     return aClk.stop()
...

平台的CPU、缓存层次和RAM细节:

>>> get_numexpr_cpuinfo_details_on_CPU()

'TLB size'______________________________:'1536 4K pages'
'address sizes'_________________________:'48 bits physical, 48 bits virtual'
'apicid'________________________________:'17'
'bogomips'______________________________:'7199.92'
'bugs'__________________________________:'fxsave_leak sysret_ss_attrs null_seg spectre_v1 spectre_v2'
'cache size'____________________________:'2048 KB'
'cache_alignment'_______________________:'64'
'clflush size'__________________________:'64'
'core id'_______________________________:'1'
'cpu MHz'_______________________________:'1400.000'
'cpu cores'_____________________________:'2'
'cpu family'____________________________:'21'
'cpuid level'___________________________:'13'
'flags'_________________________________:'fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt pdpe1gb rdtscp lm constant_tsc rep_good nopl nonstop_tsc extd_apicid aperfmperf eagerfpu pni pclmulqdq monitor ssse3 cx16 sse4_1 sse4_2 popcnt aes xsave avx lahf_lm cmp_legacy svm extapic cr8_legacy abm sse4a misalignsse 3dnowprefetch osvw ibs xop skinit wdt lwp fma4 nodeid_msr topoext perfctr_core perfctr_nb cpb hw_pstate vmmcall arat npt lbrv svm_lock nrip_save tsc_scale vmcb_clean flushbyasid decodeassists pausefilter pfthreshold'
'fpu'___________________________________:'yes'
'fpu_exception'_________________________:'yes'
'initial apicid'________________________:'1'
'microcode'_____________________________:'0x6000626'
'model'_________________________________:'1'
'model name'____________________________:'AMD FX(tm)-4100 Quad-Core Processor'
'physical id'___________________________:'0'
'power management'______________________:'ts ttp tm 100mhzsteps hwpstate cpb'
'processor'_____________________________:'1'
'siblings'______________________________:'4'
'stepping'______________________________:'2'
'vendor_id'_____________________________:'AuthenticAMD'
'wp'____________________________________:'yes'

2
Pandas被设计用于处理和处理二维数据(您会将其放入电子表格中的数据)。因为“ask_queue”和“bid_queue”不是单维系列而是二维数组,所以您不能(轻易地)将它们推入Pandas数据帧中。在这种情况下,您必须使用其他库,例如xarray:http://xarray.pydata.org/。"Original Answer"翻译成"最初的回答"。
import xarray as xr

# Creating variables, first argument is the name of the dimensions
last_price = xr.Variable("millis", data["last_price"])
ask_queue = xr.Variable(("millis", "levels"), data["ask_queue"])
bid_queue = xr.Variable(("millis", "levels"), data["bid_queue"])

# Putting the variables in a dataset, the multidimensional equivalent of a Pandas
# dataframe
ds = xr.Dataset({"last_price": last_price, "ask_queue": ask_queue,
                 "bid_queue": bid_queue}, coords={"millis": data["millis"]})

# Computing the average of ask_queue level 5~10
ds["ask_queue"][{"levels": slice(5,10)}].mean(axis=1)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接