From ec54fca78a555fafcaa4e22b34e0ed7d85462db2 Mon Sep 17 00:00:00 2001 From: moshferatu Date: Thu, 4 Apr 2024 05:49:02 -0700 Subject: [PATCH] Add script for streaming market depth data from IQFeed --- market_depth.py | 88 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 market_depth.py diff --git a/market_depth.py b/market_depth.py new file mode 100644 index 0000000..3789593 --- /dev/null +++ b/market_depth.py @@ -0,0 +1,88 @@ +import socket +import time +import atexit + +class IQFeedMarketDepthProcessor: + def __init__(self, host = '127.0.0.1', port = 9200, symbol = ''): + self.host = host + self.port = port + self.symbol = symbol + self.price_levels = {} # Dictionary to store price level to total depth mapping + self.socket = None + + def connect(self): + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.connect((self.host, self.port)) + + # Register a cleanup function to be called when the script is exiting + atexit.register(self.cleanup) + + def send_command(self, command): + if self.socket is not None: + # Commands to the IQFeed server are issued in uppercase and + # must be terminated with a carriage return and line feed. + print('Sending command', command) + self.socket.sendall((command + '\r\n').encode()) + + def process_market_depth_message(self, message): + fields = message.split(',') + + # Check if the message is a Price Level Summary or Price Level Update message + if fields[0] in ['7', '8']: + print('Processing message', message) + symbol = fields[1] + side = fields[2] + price = float(fields[3]) + level_size = int(fields[4]) + # order_count = int(fields[5]) + + # The key for price_levels dictionary is a tuple of symbol, side, and price + key = (symbol, side, price) + + self.price_levels[key] = level_size + + def process_messages(self): + if self.socket is not None: + data = '' + + while True: + try: + # Receive data from the socket + data += self.socket.recv(4096).decode('utf-8') + messages = data.split('\n') + + # If data ends in a newline, the last message is complete; otherwise, hold it for next time + if data[-1] == '\n': + data = '' + else: + data = messages.pop() + + # Process each complete message + for message in messages: + print(message) + if not message: + continue + if 'S,SERVER CONNECTED' in message: + processor.send_command(f'S,SET PROTOCOL,6.2') + processor.send_command(f'WPL,{self.symbol},10') + continue + self.process_market_depth_message(message.strip()) + except socket.error as e: + print(f'Socket error: {e}') + time.sleep(5) # Wait and try again + except Exception as e: + print(f'Unexpected error: {e}') + time.sleep(5) # Wait and try again + + def cleanup(self): + # Send the 'RPL' command to stop receiving updates + self.send_command(f'RPL,{self.symbol}') + + # Close the socket + if self.socket is not None: + self.socket.close() + +if __name__ == '__main__': + processor = IQFeedMarketDepthProcessor(symbol = '@NQM24') + processor.connect() + processor.process_messages()