import socket import time import atexit class IQFeedMarketDepthProcessor: def __init__(self, host = '127.0.0.1', port = 9200, symbol = ''): self.host = host self.port = port self.symbol = symbol self.price_levels = {} # Dictionary to store price level to total depth mapping self.socket = None def connect(self): self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect((self.host, self.port)) # Register a cleanup function to be called when the script is exiting atexit.register(self.cleanup) def send_command(self, command): if self.socket is not None: # Commands to the IQFeed server are issued in uppercase and # must be terminated with a carriage return and line feed. print('Sending command', command) self.socket.sendall((command + '\r\n').encode()) def process_market_depth_message(self, message): fields = message.split(',') # Check if the message is a Price Level Summary or Price Level Update message if fields[0] in ['7', '8']: print('Processing message', message) symbol = fields[1] side = fields[2] price = float(fields[3]) level_size = int(fields[4]) # order_count = int(fields[5]) # The key for price_levels dictionary is a tuple of symbol, side, and price key = (symbol, side, price) self.price_levels[key] = level_size def process_messages(self): if self.socket is not None: data = '' while True: try: # Receive data from the socket data += self.socket.recv(4096).decode('utf-8') messages = data.split('\n') # If data ends in a newline, the last message is complete; otherwise, hold it for next time if data[-1] == '\n': data = '' else: data = messages.pop() # Process each complete message for message in messages: print(message) if not message: continue if 'S,SERVER CONNECTED' in message: processor.send_command(f'S,SET PROTOCOL,6.2') processor.send_command(f'WPL,{self.symbol},10') continue self.process_market_depth_message(message.strip()) except socket.error as e: print(f'Socket error: {e}') time.sleep(5) # Wait and try again except Exception as e: print(f'Unexpected error: {e}') time.sleep(5) # Wait and try again def cleanup(self): # Send the 'RPL' command to stop receiving updates self.send_command(f'RPL,{self.symbol}') # Close the socket if self.socket is not None: self.socket.close() if __name__ == '__main__': processor = IQFeedMarketDepthProcessor(symbol = '@NQM24') processor.connect() processor.process_messages()