Add the ability to stream historical candles in addition to quotes

This commit is contained in:
moshferatu 2024-07-06 05:10:45 -07:00
parent cee81dd3a4
commit 61f477443b

View File

@ -3,6 +3,8 @@ import json
import requests import requests
import websockets import websockets
from datetime import datetime
class Tastytrade: class Tastytrade:
def __init__(self, username: str, password: str = None) -> None: def __init__(self, username: str, password: str = None) -> None:
@ -73,7 +75,7 @@ class Tastytrade:
response = await ws.recv() response = await ws.recv()
return json.loads(response) return json.loads(response)
async def stream_market_data(self, symbols: list) -> None: async def stream_market_data(self, subscription_message: dict) -> None:
token_response = self.get('/api-quote-tokens').json() token_response = self.get('/api-quote-tokens').json()
api_quote_token = token_response['data']['token'] api_quote_token = token_response['data']['token']
dxlink_url = token_response['data']['dxlink-url'] dxlink_url = token_response['data']['dxlink-url']
@ -83,12 +85,6 @@ class Tastytrade:
setup_response = await self.setup_connection(ws) setup_response = await self.setup_connection(ws)
auth_response = await self.authenticate(ws, api_quote_token) auth_response = await self.authenticate(ws, api_quote_token)
# For some reason, the auth response is always unauthorized yet subsequent requests work.
# if auth_response.get('state') != 'AUTHORIZED':
# print('Authorization failed.')
# return
# Request new channel for Quote (and Greeks) events
channel_request = { channel_request = {
'type': 'CHANNEL_REQUEST', 'type': 'CHANNEL_REQUEST',
'channel': 1, 'channel': 1,
@ -98,20 +94,35 @@ class Tastytrade:
} }
} }
await ws.send(json.dumps(channel_request)) await ws.send(json.dumps(channel_request))
channel_response = await ws.recv() await ws.recv()
subscription_message = {
'type': 'FEED_SUBSCRIPTION',
'channel': 1,
'add': [{'symbol': symbol, 'type': 'Quote'} for symbol in symbols]
}
await ws.send(json.dumps(subscription_message)) await ws.send(json.dumps(subscription_message))
async for message in ws: async for message in ws:
data = json.loads(message) data = json.loads(message)
if data["type"] == "FEED_DATA": if data["type"] == "FEED_DATA":
# TODO: Have the caller provide a callback function to handle the data. # TODO: Have the caller provide a callback function to handle the data.
print(data) print(data)
def start_streaming(self, symbols: list): def create_quote_subscription(self, symbols: list) -> dict:
asyncio.get_event_loop().run_until_complete(self.stream_market_data(symbols)) return {
'type': 'FEED_SUBSCRIPTION',
'channel': 1,
'add': [{'symbol': symbol, 'type': 'Quote'} for symbol in symbols]
}
def create_historical_data_subscription(self, symbol: str, period: int, type: str, from_time: int) -> dict:
candle_symbol = f'{symbol}{{={period}{type}}}'
return {
'type': 'FEED_SUBSCRIPTION',
'channel': 1,
'add': [{'symbol': candle_symbol, 'type': 'Candle', 'fromTime': from_time}]
}
def stream_quotes(self, symbols: list):
subscription_message = self.create_quote_subscription(symbols)
asyncio.get_event_loop().run_until_complete(self.stream_market_data(subscription_message))
def stream_historical_data(self, symbol: str, period: int, type: str, from_time: datetime):
from_time_unix = int(from_time.timestamp())
subscription_message = self.create_historical_data_subscription(symbol, period, type, from_time_unix)
asyncio.get_event_loop().run_until_complete(self.stream_market_data(subscription_message))