import logging import os import pandas as pd from concurrent.futures import ProcessPoolExecutor from dataclasses import dataclass from datetime import datetime from dotenv import load_dotenv from .credit_target_strategy import CreditTargetStrategy from .delta_target_strategy import DeltaTargetStrategy from .option_spread_strategy import OptionSpreadStrategy from .option_type import OptionType load_dotenv() logging.basicConfig(level=logging.WARN) FEES_PER_CONTRACT = 0.80 MARKET_CLOSE = '16:00:00' MARKET_OPEN = '09:35:00' OPTION_DATA_DIRECTORY = os.getenv('OPTION_DATA_DIRECTORY') STRIKE_MULTIPLE = 5.0 @dataclass class BacktestResult: date: str entry_time: str exit_time: str spreads: list trade_entered: bool trade_pnl: float profit: float credit: float mfe: float mae: float # Metrics dates = [] max_drawdowns = [] max_profits = [] wins = [] exit_times = [] def get_spread_history_credit(historical_option_data: pd.DataFrame, option_strat: CreditTargetStrategy) -> pd.DataFrame: current_date = historical_option_data.iloc[0]['quote_datetime'][:10] opening_quotes = historical_option_data[(historical_option_data['quote_datetime'] == (current_date + ' ' + option_strat.trade_entry_time))].copy() if opening_quotes.empty: return None else: opening_quotes_by_contract_type = opening_quotes[opening_quotes['option_type'] == option_strat.option_type.value].copy() opening_quotes_by_contract_type['credit_diff'] = (opening_quotes_by_contract_type['bid'] - option_strat.credit_target).abs() short_contract = opening_quotes_by_contract_type.loc[opening_quotes_by_contract_type['credit_diff'].idxmin()] short_strike = short_contract['strike'] logging.info('Short Strike: %s', short_strike) long_strike = short_strike + (option_strat.spread_width if option_strat.option_type == OptionType.CALL else -option_strat.spread_width) # Sometimes the strike we're interested in doesn't exist. Find the nearest one that does. increment = 5.0 if option_strat.option_type == OptionType.CALL else -5.0 while opening_quotes_by_contract_type[(opening_quotes_by_contract_type['strike'] == long_strike)].empty: long_strike = long_strike + increment logging.info('Long Strike: %s', long_strike) short_strike_history = historical_option_data[(historical_option_data['strike'] == short_strike) & (historical_option_data['option_type'] == option_strat.option_type.value)].set_index('quote_datetime') long_strike_history = historical_option_data[(historical_option_data['strike'] == long_strike) & (historical_option_data['option_type'] == option_strat.option_type.value)].set_index('quote_datetime') spread_history = short_strike_history.join(long_strike_history, lsuffix='_short_strike', rsuffix='_long_strike', on='quote_datetime') return spread_history def get_spread_history(historical_option_data: pd.DataFrame, option_strat: DeltaTargetStrategy) -> pd.DataFrame: current_date = historical_option_data.iloc[0]['quote_datetime'][:10] opening_quotes = historical_option_data[(historical_option_data['quote_datetime'] == (current_date + ' ' + option_strat.trade_entry_time))].copy() if opening_quotes.empty: return None else: opening_quotes['delta_diff'] = (opening_quotes['delta'] - option_strat.delta_target).abs() short_contract = opening_quotes.loc[opening_quotes['delta_diff'].idxmin()] short_strike = short_contract['strike'] logging.info('Short Strike: %s', short_strike) long_strike = short_strike + (option_strat.spread_width if option_strat.option_type == OptionType.CALL else -option_strat.spread_width) # Sometimes the strike we're interested in doesn't exist. Find the nearest one that does. increment = 5.0 if option_strat.option_type == OptionType.CALL else -5.0 while opening_quotes[(opening_quotes['strike'] == long_strike)].empty: long_strike = long_strike + increment logging.info('Long Strike: %s', long_strike) short_strike_history = historical_option_data[(historical_option_data['strike'] == short_strike) & (historical_option_data['option_type'] == option_strat.option_type.value)].set_index('quote_datetime') long_strike_history = historical_option_data[(historical_option_data['strike'] == long_strike) & (historical_option_data['option_type'] == option_strat.option_type.value)].set_index('quote_datetime') spread_history = short_strike_history.join(long_strike_history, lsuffix='_short_strike', rsuffix='_long_strike', on='quote_datetime') return spread_history def _backtest_iron_condor( historical_data_file: str, call_spread_strategy: OptionSpreadStrategy, put_spread_strategy: OptionSpreadStrategy ) -> BacktestResult: print('Processing File:', historical_data_file) historical_option_data = pd.read_csv(historical_data_file) if isinstance(call_spread_strategy, CreditTargetStrategy): call_spread_history = get_spread_history_credit(historical_option_data, call_spread_strategy) put_spread_history = get_spread_history_credit(historical_option_data, put_spread_strategy) else: call_spread_history = get_spread_history(historical_option_data, call_spread_strategy) put_spread_history = get_spread_history(historical_option_data, put_spread_strategy) entry_time = call_spread_strategy.trade_entry_time if call_spread_history is None or put_spread_history is None: # This can happen when the market closes early for the day. logging.warn('No spread history found in %s for %s', historical_data_file, entry_time) return None current_date = call_spread_history.iloc[0].name[:10] call_spread_entry = call_spread_history.loc[current_date + ' ' + entry_time] original_call_spread_price = ((call_spread_entry['ask_short_strike'] + call_spread_entry['bid_short_strike']) / 2.0) - ((call_spread_entry['ask_long_strike'] + call_spread_entry['bid_long_strike']) / 2.0) put_spread_entry = put_spread_history.loc[current_date + ' ' + entry_time] original_put_spread_price = ((put_spread_entry['ask_short_strike'] + put_spread_entry['bid_short_strike']) / 2.0) - ((put_spread_entry['ask_long_strike'] + put_spread_entry['bid_long_strike']) / 2.0) # Calculate entry slippage. if original_call_spread_price > 0.05: original_call_spread_price = original_call_spread_price - (original_call_spread_price % 0.05) logging.info('Original Call Spread Price: %s', original_call_spread_price) if original_put_spread_price > 0.05: original_put_spread_price = original_put_spread_price - (original_put_spread_price % 0.05) logging.info('Original Put Spread Price: %s', original_put_spread_price) premium_received = original_call_spread_price + original_put_spread_price call_spread_details = { "legs": [{"action": "SELL", "strike": call_spread_entry['strike_short_strike'], "type": "CALL"}, {"action": "BUY", "strike": call_spread_entry['strike_long_strike'], "type": "CALL"}], "open": original_call_spread_price, "high": None, "low": None, "close": None } put_spread_details = { "legs": [{"action": "SELL", "strike": put_spread_entry['strike_short_strike'], "type": "PUT"}, {"action": "BUY", "strike": put_spread_entry['strike_long_strike'], "type": "PUT"}], "open": original_put_spread_price, "high": None, "low": None, "close": None } trades_entered = False call_spread_stopped_out = False put_spread_stopped_out = False max_profit = 0.0 max_drawdown = 0.0 exit_time = '16:00:00' current_call_spread_price = original_call_spread_price current_put_spread_price = original_put_spread_price for i in range(len(call_spread_history)): call_spread = call_spread_history.iloc[i] put_spread = put_spread_history.iloc[i] if call_spread.name.endswith(entry_time): trades_entered = True continue if not trades_entered: continue if call_spread.name.endswith('16:05:00') or call_spread.name.endswith('16:10:00') or call_spread.name.endswith('16:15:00'): continue if call_spread['high_short_strike'] > call_spread['high_long_strike']: current_call_spread_price = call_spread['high_short_strike'] - call_spread['high_long_strike'] else: current_call_spread_price = ((call_spread['ask_short_strike'] + call_spread['bid_short_strike']) / 2.0) - ((call_spread['ask_long_strike'] + call_spread['bid_long_strike']) / 2.0) if put_spread['high_short_strike'] > put_spread['high_long_strike']: current_put_spread_price = put_spread['high_short_strike'] - put_spread['high_long_strike'] else: current_put_spread_price = ((put_spread['ask_short_strike'] + put_spread['bid_short_strike']) / 2.0) - ((put_spread['ask_long_strike'] + put_spread['bid_long_strike']) / 2.0) call_spread_details['high'] = max(call_spread_details['high'] or float('-inf'), current_call_spread_price) call_spread_details['low'] = min(call_spread_details['low'] or float('inf'), current_call_spread_price) call_spread_details['close'] = current_call_spread_price put_spread_details['high'] = max(put_spread_details['high'] or float('-inf'), current_put_spread_price) put_spread_details['low'] = min(put_spread_details['low'] or float('inf'), current_put_spread_price) put_spread_details['close'] = current_put_spread_price if not call_spread_stopped_out: if current_call_spread_price >= ((call_spread_strategy.stop_loss_multiple + 1) * original_call_spread_price): premium_received -= original_call_spread_price * (call_spread_strategy.stop_loss_multiple + 1) call_spread_details['close'] = original_call_spread_price * (call_spread_strategy.stop_loss_multiple + 1) + 0.10 # Calculate exit slippage. premium_received -= 0.10 # TODO: Make this configurable. call_spread_stopped_out = True exit_time = call_spread.name[-8:] logging.info('Call Spread Stopped Out') if not put_spread_stopped_out: if current_put_spread_price >= ((put_spread_strategy.stop_loss_multiple + 1) * original_put_spread_price): premium_received -= original_put_spread_price * (put_spread_strategy.stop_loss_multiple + 1) premium_received -= 0.10 # TODO: Make this configurable. put_spread_details['close'] = original_put_spread_price * (put_spread_strategy.stop_loss_multiple + 1) + 0.10 put_spread_stopped_out = True exit_time = call_spread.name[-8:] logging.info('Put Spread Stopped Out') if not (call_spread_stopped_out and put_spread_stopped_out): if current_call_spread_price > current_put_spread_price: if put_spread['low_short_strike'] > put_spread['low_long_strike']: current_put_spread_price = put_spread['low_short_strike'] - put_spread['low_long_strike'] else: current_put_spread_price = ((put_spread['ask_short_strike'] + put_spread['bid_short_strike']) / 2.0) - ((put_spread['ask_long_strike'] + put_spread['bid_long_strike']) / 2.0) else: if call_spread['low_short_strike'] > call_spread['low_long_strike']: current_call_spread_price = call_spread['low_short_strike'] - call_spread['low_long_strike'] else: current_call_spread_price = ((call_spread['ask_short_strike'] + call_spread['bid_short_strike']) / 2.0) - ((call_spread['ask_long_strike'] + call_spread['bid_long_strike']) / 2.0) if call_spread_stopped_out: current_call_spread_price = original_call_spread_price * (call_spread_strategy.stop_loss_multiple + 1) if put_spread['high_short_strike'] > put_spread['high_long_strike']: current_put_spread_price = put_spread['high_short_strike'] - put_spread['high_long_strike'] else: current_put_spread_price = ((put_spread['ask_short_strike'] + put_spread['bid_short_strike']) / 2.0) - ((put_spread['ask_long_strike'] + put_spread['bid_long_strike']) / 2.0) if put_spread_stopped_out: current_put_spread_price = original_put_spread_price * (put_spread_strategy.stop_loss_multiple + 1) if call_spread['high_short_strike'] > call_spread['high_long_strike']: current_call_spread_price = call_spread['high_short_strike'] - call_spread['high_long_strike'] else: current_call_spread_price = ((call_spread['ask_short_strike'] + call_spread['bid_short_strike']) / 2.0) - ((call_spread['ask_long_strike'] + call_spread['bid_long_strike']) / 2.0) current_profit = (original_call_spread_price - current_call_spread_price) + (original_put_spread_price - current_put_spread_price) current_profit_dollars = current_profit * call_spread_strategy.number_of_contracts * 100 if current_profit_dollars > max_profit: max_profit = current_profit_dollars if current_profit_dollars < max_drawdown: max_drawdown = current_profit_dollars if not call_spread_stopped_out and current_call_spread_price > 0.05: premium_received -= current_call_spread_price if not put_spread_stopped_out and current_put_spread_price > 0.05: premium_received -= current_put_spread_price number_of_contracts = call_spread_strategy.number_of_contracts stop_out_fees = 0.0 # It costs money to get stopped out. if call_spread_stopped_out: stop_out_fees += (2 * FEES_PER_CONTRACT * number_of_contracts) if put_spread_stopped_out: stop_out_fees += (2 * FEES_PER_CONTRACT * number_of_contracts) fees = 4 * FEES_PER_CONTRACT * number_of_contracts commissions = 4 * number_of_contracts premium_received = (premium_received * number_of_contracts * 100) - (fees + commissions) - stop_out_fees dates.append(current_date) max_drawdowns.append(max_drawdown) max_profits.append(max_profit) wins.append(True if premium_received > 0 else False) exit_times.append(exit_time) result = BacktestResult( date=current_date, entry_time=f'{current_date} {entry_time}', exit_time=f'{current_date} {exit_time}', spreads=[call_spread_details, put_spread_details], trade_entered=True, trade_pnl=premium_received, profit=0.0, # TODO: Calculated elsewhere. Clean this up. credit=original_call_spread_price + original_put_spread_price, mfe=max_profit, mae=max_drawdown ) logging.info('Premium Received: %f', premium_received) return result def backtest_iron_condor( backtest_name: str, call_spread_strategy: OptionSpreadStrategy, put_spread_strategy: OptionSpreadStrategy, start_date: datetime, end_date: datetime ) -> pd.DataFrame: total_premium_received = 0.0 total_trades = 0.0 total_wins = 0.0 start_year = start_date.year end_year = end_date.year futures = [] with ProcessPoolExecutor(max_workers = 10) as executor: for year in range(start_year, end_year + 1): year_directory = os.path.join(OPTION_DATA_DIRECTORY, str(year)) for file in os.listdir(year_directory): historical_data_file = os.path.join(year_directory, file) if os.path.isdir(historical_data_file) or not file.endswith('.csv'): continue # Assuming file format 'YYYY-MM-DD.csv'. file_date_str = os.path.splitext(file)[0] file_date = datetime.strptime(file_date_str, '%Y-%m-%d') # TODO: This doesn't work as expected when the start date is not set to midnight. if file_date < start_date or file_date > end_date: continue logging.info('Processing File: %s', historical_data_file) future = executor.submit(_backtest_iron_condor, historical_data_file, call_spread_strategy, put_spread_strategy) futures.append(future) backtest_results = [] for future in futures: backtest_result = future.result() if backtest_result: total_premium_received += backtest_result.trade_pnl backtest_result.profit = total_premium_received backtest_results.append(backtest_result) if backtest_result.trade_entered: total_trades += 1 if backtest_result.trade_pnl > 0.0: total_wins += 1 logging.info('Overall PnL: %f', total_premium_received) logging.info('Win Rate: %f', (total_wins / total_trades) if total_trades > 0 else 0.0) logging.info('Average Premium Received: %f', (total_premium_received / total_trades) if total_trades > 0 else 0.0) # TODO: Either look up the symbol in the historical option data or have the client provide it. backtest_results = pd.DataFrame([{ 'Date': result.date, 'Symbol': 'SPX', 'Strategy': backtest_name, 'Entry Time': result.entry_time, 'Exit Time': result.exit_time, 'Spreads': result.spreads, 'Profit': result.trade_pnl, 'Cumulative Profit': result.profit } for result in backtest_results]) return backtest_results