Pandas DataFrame和Series - IB TWS历史数据

4

我正在尝试将pandas模块应用到我的代码中,以便重新组织从IB TWS服务器接收到的消息。

代码如下:

from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract


class MyWrapper(EWrapper):

    def nextValidId(self, orderId:int):
        print("Setting nextValidOrderId: %d", orderId)
        self.nextValidOrderId = orderId
        self.start()

    def historicalData(self, reqId, bar):
        print("HistoricalData. ", reqId, "Date:", bar.date, "Open:", bar.open, "High:", bar.high, "Low:", bar.low, "Close:", bar.close, "Volume:", bar.volume, "Average:", bar.average, "Count:", bar.barCount)

    def historicalDataUpdate(self, reqId, bar):
        print("HistoricalDataUpdate. ", reqId, "Date:", bar.date, "Open:", bar.open, "High:", bar.high, "Low:", bar.low, "Close:", bar.close, "Volume:", bar.volume, "Average:", bar.average, "Count:", bar.barCount)

    def error(self, reqId, errorCode, errorString):
        print("Error. Id: " , reqId, " Code: " , errorCode , " Msg: " , errorString)

    def start(self):
        queryTime = ""

        contract = Contract()
        contract.secType = "STK"
        contract.symbol = "NIO"
        contract.currency = "USD"
        contract.exchange = "SMART"

        app.reqHistoricalData(1, contract, queryTime, "1 D", "5 secs", "TRADES", 0, 1, True, [])

app = EClient(MyWrapper())
app.connect("127.0.0.1", 7496, clientId=123)
app.run()

这段代码获取给定股票的历史数据,然后返回最新的更新。

我面临的问题是返回的消息被组织成如下形式:

HistoricalDataUpdate.  1 Date: 20200708  08:31:00 Open: 14.17 High: 14.17 Low: 14.17 Close: 14.17 Volume: -1 Average: 14.15 Count: -1

当我尝试以重新组织的方式检索数据时,例如

HistoricalDataUpdate.  1 Date:            Open:  High:  Low:   Close:  Volume:  Average:  Count:
                       20200708  08:31:00 14.17  14.17  14.17  14.17   -1       14.15     -1

希望得到帮助,谢谢。

2个回答

3
回调函数会提供 ibapi.common.BarData,您可以读取它的变量来获取类似 {date:..., open:123...} 的字典。
Pandas 可以从字典列表创建数据帧,因此将它们存储在列表中。
也许您想要使用日期作为索引,Pandas 也可以做到,令人惊讶的是它能够读取这种格式。
完成后,您可以将数据保存在 CSV 文件中。
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import pandas as pd

class MyWrapper(EWrapper):
    def __init__(self):
        self.data = []
        self.df=None
        
    def nextValidId(self, orderId:int):
        print("Setting nextValidOrderId: %d", orderId)
        self.nextValidOrderId = orderId
        self.start()

    def historicalData(self, reqId, bar):
        self.data.append(vars(bar));
        
    def historicalDataUpdate(self, reqId, bar):
        line = vars(bar)
        # pop date and make it the index, add rest to df
        # will overwrite last bar at that same time
        self.df.loc[pd.to_datetime(line.pop('date'))] = line
        
    def historicalDataEnd(self, reqId: int, start: str, end: str):
        print("HistoricalDataEnd. ReqId:", reqId, "from", start, "to", end)
        self.df = pd.DataFrame(self.data)
        self.df['date'] = pd.to_datetime(self.df['date'])
        self.df.set_index('date', inplace=True)
        
    def error(self, reqId, errorCode, errorString):
        print("Error. Id: " , reqId, " Code: " , errorCode , " Msg: " , errorString)

    def start(self):
        queryTime = ""
        
        # so everyone can get data use fx
        fx = Contract()
        fx.secType = "CASH" 
        fx.symbol = "USD"
        fx.currency = "JPY"
        fx.exchange = "IDEALPRO"
        
        # setting update to 1 minute still sends an update every tick? but timestamps are 1 min
        # I don't think keepUpToDate sends a realtimeBar every 5 secs, just updates the last bar.
        app.reqHistoricalData(1, fx, queryTime, "1 D", "1 min", "MIDPOINT", 0, 1, True, [])

wrap = MyWrapper()        
app = EClient(wrap)
app.connect("127.0.0.1", 7497, clientId=123)

#I just use this in jupyter so I can interact with df
import threading
threading.Thread(target = app.run).start()

#this isn't needed in jupyter, just run another cell
import time
time.sleep(300) # in 5 minutes check the df and close

print(wrap.df)
wrap.df.to_csv("myfile.csv")#save in file
app.disconnect()

#in jupyter to show plot
%matplotlib inline 
wrap.df.close.plot()

我使用Jupyter Notebook,因此添加了线程,以便仍然可以进行交互。

这里是一些输出。先打印从historicalDataEnd收到和打印的第一个数据。使用带有日期时间索引的变量创建DataFrame,以便可以按时间添加条形图。

历史数据结束。ReqId: 1 from 20200707 14:23:19 to 20200708 14:23:19

然后在300秒后,我打印出DataFrame。检查ohlc是否合乎逻辑,并注意每分钟会有一个新的bar。我假设14:28 bar只有前19秒,因为我的五分钟(300秒)从14:23:19开始。这正是您想要并期望保持图表最新的行为。

2020-07-08 14:24:00  107.231  107.236  107.231  107.233     -1       -1   
2020-07-08 14:25:00  107.233  107.234   107.23  107.232     -1       -1   
2020-07-08 14:26:00  107.232  107.232  107.225  107.232     -1       -1   
2020-07-08 14:27:00  107.232  107.239  107.231  107.239     -1       -1   
2020-07-08 14:28:00  107.239  107.239  107.236  107.236     -1       -1   

你可以看到它获取所有的条形图(仅限图形中的收盘价)并保持最新状态。 JPY

1
如果您知道列名,甚至可以在请求之前创建数据框。我展示的是“keepUpToDate=True”的用例。它用于获取一些历史记录并使其保持最新状态。如果这不是您的用例,那么也许这不是您需要的数据请求。 - brian
1
阅读文档,http://interactivebrokers.github.io/tws-api/connection.html#connect - 只需等待nextValidId才能进行请求。你不需要生成orderId,它只是下一个orderId的提醒。每个订单都必须增加,但你可以以任何方式跟踪它们。 - brian
我得到的不是更新,而是“开盘价、最高价、最低价、收盘价、成交量、条数、平均值”。例如,我的输出是:2021-06-18 16:16:00,3.2185,3.2185,3.2185,3.2185,-1,-1,-1.0 2021-06-18 16:17:00,3.2185,3.2185,3.2185,3.2185,-1,-1,-1.0 2021-06-18 16:18:00,开盘价、最高价、最低价、收盘价、成交量、条数、平均值 2021-06-18 16:19:00,开盘价、最高价、最低价、收盘价、成交量、条数、平均值 2021-06-18 16:20:00,开盘价、最高价、最低价、收盘价、成交量、条数、平均值 - spitz
@spitz 你应该添加一个检查,以查看是否收到了有效的酒吧。市场可能已经关闭了。 - brian
@brian 看起来是一些数据框键查找速度问题... 对我来说,以下内容有效:d = pd.to_datetime(line.pop('date'))self.df.loc[d] = line.values() - spitz
显示剩余9条评论

0
  1. 这真的是ETL(提取,转换,加载)
  2. 我可以看到每个数据元素都是形式为名称:。使用此作为正则表达式获取所有名称标记
  3. 使用此列表根据标记位置和下一个标记将每个标记提取到字典中
  4. 获取第一个标记之前的数据标签
  5. 最后将其转换为pandas数据框
text= "HistoricalDataUpdate.  1 Date: 20200708  08:31:00 Open: 14.17 High: 14.17 Low: 14.17 Close: 14.17 Volume: -1 Average: 14.15 Count: -1"
tokens = re.findall("([A-Z][a-z]*:)", text)
json = {t:text[re.search(tokens[i], text).span(0)[1]:re.search(tokens[i+1], text).span(0)[0]] 
        if i+1<len(tokens) 
        else text[re.search(tokens[i], text).span(0)[1]:] 
        for i,t in enumerate(tokens)}
json = {"label":text[:re.search(tokens[0], text).span(0)[0]], **json}
df = pd.DataFrame([json])
df

输出

    label   Date:   Open:   High:   Low:    Close:  Volume: Average:    Count:
0   HistoricalDataUpdate. 1 20200708 08:31:00   14.17   14.17   14.17   14.17   -1  14.15   -1



这是您的代码吗?为什么要打印出如此难以解析的内容?def historicalData(self, reqId, bar): print("HistoricalData. ", reqId, "Date:", bar.date, "Open:", bar.open, "High:", bar.high, "Low:", bar.low, "Close:", bar.close, "Volume:", bar.volume, "Average:", bar.average, "Count:", bar.barCount) - Rob Raymond
这些是我找到的用于定义每个请求的名称。在你提供的标记为“text”的示例的第一行中,我应该放什么? “HistoricalDataUpdate. 1 Date: 20200708 08:31:00 Open: 14.17 High: 14.17 Low: 14.17 Close: 14.17 Volume: -1 Average: 14.15 Count: -1”只是代码打印的输出示例,因此每个值都会根据接收到的数据而不同。 - dinosaurslayer
我建议将其保存在数据结构中,而不是将其发送到stdout并尝试反向工程化print()语句。 bar看起来像一个相当不错的数据结构。 - Rob Raymond
我该如何解析bar?我已经将我的打印语句更改为print("HistoricalData. ReqId:", reqId, "BarData:", bar)print("HistoricalDataUpdate. ReqId:", reqId, "BarData:", bar) - dinosaurslayer
2
正如其他答案所建议的那样,print("HistoricalData. ", reqId, "Date:", bar.date, "Open:", bar.open, "High:", bar.high, "Low:", bar.low, "Close:", bar.close, "Volume:", bar.volume, "Average:", bar.average, "Count:", bar.barCount) 可以改为创建一个字典并将其附加到数据框中。 - Rob Raymond
我需要将BarData中的数据附加到定义每个键吗? - dinosaurslayer

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接