import logging import numpy as np import os import pandas as pd import plotly.express as px from concurrent.futures import ProcessPoolExecutor from dataclasses import dataclass from datetime import datetime from dotenv import load_dotenv from .credit_target_strategy import CreditTargetStrategy from .delta_target_strategy import DeltaTargetStrategy from .option_spread_strategy import OptionSpreadStrategy from .option_type import OptionType load_dotenv() logging.basicConfig(level=logging.WARN) FEES_PER_CONTRACT = 0.80 MARKET_CLOSE = '16:00:00' MARKET_OPEN = '09:35:00' OPTION_DATA_DIRECTORY = os.getenv('OPTION_DATA_DIRECTORY') STRIKE_MULTIPLE = 5.0 @dataclass class BacktestResult: date: str entry_time: str exit_time: str spreads: list trade_entered: bool trade_pnl: float profit: float credit: float mfe: float mae: float # Metrics dates = [] max_drawdowns = [] max_profits = [] wins = [] exit_times = [] def plot(backtest_results: pd.DataFrame, title: str): backtest_results.drop('Profit', axis = 1, inplace = True) backtest_results.rename(columns = {'Cumulative Profit' : 'Profit'}, inplace = True) # Exclude dates on which the market was closed from being plotted in order to prevent gaps on chart. start_date = backtest_results['Date'].min() end_date = backtest_results['Date'].max() backtest_date_range = pd.date_range(start = start_date, end = end_date).to_list() backtest_date_range = set([timestamp.strftime('%Y-%m-%d') for timestamp in backtest_date_range]) backtested_dates = set(backtest_results['Date'].to_list()) excluded_dates = backtest_date_range - backtested_dates backtest_results['Color'] = np.where(backtest_results['Profit'] >= 0, 'limegreen', 'red') color_sequence = ['limegreen', 'red'] if backtest_results.iloc[0]['Profit'] >= 0 else ['red', 'limegreen'] chart = px.bar(backtest_results, x='Date', y='Profit', title=title, color='Color', color_discrete_sequence=color_sequence, hover_data={'Color': False}) chart.update_layout({ 'font_color': '#7a7c7d', 'plot_bgcolor': '#0f0f0f', 'paper_bgcolor': '#0f0f0f', 'title_font_color': '#7a7c7d', 'xaxis': { 'gridcolor': '#0f0f0f', 'zerolinecolor': '#0f0f0f' }, 'yaxis': { 'gridcolor': '#0f0f0f', 'zerolinecolor': '#0f0f0f' } }) chart.update_traces(marker_line_width=0, selector=dict(type='bar')) chart.update_layout(bargap=0, bargroupgap = 0) chart.update_layout(showlegend=False) chart.update_xaxes(rangebreaks=[dict(values=list(excluded_dates))]) chart.show() def get_spread_history_credit(historical_option_data: pd.DataFrame, option_strat: CreditTargetStrategy) -> pd.DataFrame: current_date = historical_option_data.iloc[0]['quote_datetime'][:10] opening_quotes = historical_option_data[(historical_option_data['quote_datetime'] == (current_date + ' ' + option_strat.trade_entry_time))] if opening_quotes.empty: return None else: opening_quotes_by_contract_type = opening_quotes[opening_quotes['option_type'] == option_strat.option_type.value] short_contract_candidates = opening_quotes_by_contract_type[(opening_quotes_by_contract_type['bid'] >= (option_strat.credit_target - 1.0)) & (opening_quotes_by_contract_type['bid'] < (option_strat.credit_target + 1.0))] credit_increment = 2.00 while short_contract_candidates.empty: short_contract_candidates = opening_quotes_by_contract_type[(opening_quotes_by_contract_type['bid'] >= (option_strat.credit_target - credit_increment)) & (opening_quotes_by_contract_type['bid'] < (option_strat.credit_target + credit_increment))] credit_increment += 1.00 strike_candidates = {} for i in range(len(short_contract_candidates)): candidate = short_contract_candidates.iloc[i] strike_candidates[candidate['bid']] = candidate['strike'] closest_bid = min(strike_candidates, key=lambda candidate_bid: abs(option_strat.credit_target - candidate_bid)) short_strike = strike_candidates[closest_bid] logging.info('Short Strike: %s', short_strike) long_strike = short_strike + (option_strat.spread_width if option_strat.option_type == OptionType.CALL else -option_strat.spread_width) # Sometimes the strike we're interested in doesn't exist. Find the nearest one that does. increment = 5.0 if option_strat.option_type == OptionType.CALL else -5.0 while opening_quotes_by_contract_type[(opening_quotes_by_contract_type['strike'] == long_strike)].empty: # TODO: Doesn't work for one day in 2018. long_strike = long_strike + increment logging.info('Long Strike: %s', long_strike) short_strike_history = historical_option_data[(historical_option_data['strike'] == short_strike) & (historical_option_data['option_type'] == option_strat.option_type.value)].set_index('quote_datetime') long_strike_history = historical_option_data[(historical_option_data['strike'] == long_strike) & (historical_option_data['option_type'] == option_strat.option_type.value)].set_index('quote_datetime') spread_history = short_strike_history.join(long_strike_history, lsuffix='_short_strike', rsuffix='_long_strike', on='quote_datetime') return spread_history def get_spread_history(historical_option_data: pd.DataFrame, option_strat: DeltaTargetStrategy) -> pd.DataFrame: current_date = historical_option_data.iloc[0]['quote_datetime'][:10] opening_quotes = historical_option_data[(historical_option_data['quote_datetime'] == (current_date + ' ' + option_strat.trade_entry_time))] if opening_quotes.empty: return None else: if option_strat.option_type == OptionType.PUT: short_contract_candidates = opening_quotes[(opening_quotes['delta'] > -option_strat.delta_upper_bound) & (opening_quotes['delta'] <= -option_strat.delta_lower_bound)] else: short_contract_candidates = opening_quotes[(opening_quotes['delta'] < option_strat.delta_upper_bound) & (opening_quotes['delta'] >= option_strat.delta_lower_bound)] delta_increment = 0.01 while short_contract_candidates.empty: if option_strat.option_type == OptionType.PUT: short_contract_candidates = opening_quotes[(opening_quotes['delta'] > (-option_strat.delta_upper_bound - delta_increment)) & (opening_quotes['delta'] <= -option_strat.delta_lower_bound)] else: short_contract_candidates = opening_quotes[(opening_quotes['delta'] < (option_strat.delta_upper_bound + delta_increment)) & (opening_quotes['delta'] >= option_strat.delta_lower_bound)] delta_increment = delta_increment + 0.01 # Might return more than one, take greatest strike short_contract = short_contract_candidates.iloc[-1] short_strike = short_contract['strike'] logging.info('Short Strike: %s', short_strike) long_strike = short_strike + (option_strat.spread_width if option_strat.option_type == OptionType.CALL else -option_strat.spread_width) # Sometimes the strike we're interested in doesn't exist. Find the nearest one that does. increment = 5.0 if option_strat.option_type == OptionType.CALL else -5.0 while opening_quotes[(opening_quotes['strike'] == long_strike)].empty: # TODO: Doesn't work for one day in 2018. long_strike = long_strike + increment logging.info('Long Strike: %s', long_strike) short_strike_history = historical_option_data[(historical_option_data['strike'] == short_strike) & (historical_option_data['option_type'] == option_strat.option_type.value)].set_index('quote_datetime') long_strike_history = historical_option_data[(historical_option_data['strike'] == long_strike) & (historical_option_data['option_type'] == option_strat.option_type.value)].set_index('quote_datetime') spread_history = short_strike_history.join(long_strike_history, lsuffix='_short_strike', rsuffix='_long_strike', on='quote_datetime') return spread_history def _backtest_iron_condor( historical_data_file: str, call_spread_strategy: OptionSpreadStrategy, put_spread_strategy: OptionSpreadStrategy ) -> BacktestResult: print('Processing File:', historical_data_file) historical_option_data = pd.read_csv(historical_data_file) if isinstance(call_spread_strategy, CreditTargetStrategy): call_spread_history = get_spread_history_credit(historical_option_data, call_spread_strategy) put_spread_history = get_spread_history_credit(historical_option_data, put_spread_strategy) else: call_spread_history = get_spread_history(historical_option_data, call_spread_strategy) put_spread_history = get_spread_history(historical_option_data, put_spread_strategy) current_date = call_spread_history.iloc[0].name[:10] entry_time = call_spread_strategy.trade_entry_time call_spread_entry = call_spread_history.loc[current_date + ' ' + entry_time] original_call_spread_price = ((call_spread_entry['ask_short_strike'] + call_spread_entry['bid_short_strike']) / 2.0) - ((call_spread_entry['ask_long_strike'] + call_spread_entry['bid_long_strike']) / 2.0) put_spread_entry = put_spread_history.loc[current_date + ' ' + entry_time] original_put_spread_price = ((put_spread_entry['ask_short_strike'] + put_spread_entry['bid_short_strike']) / 2.0) - ((put_spread_entry['ask_long_strike'] + put_spread_entry['bid_long_strike']) / 2.0) # Calculate entry slippage. if original_call_spread_price > 0.05: original_call_spread_price = original_call_spread_price - (original_call_spread_price % 0.05) logging.info('Original Call Spread Price: %s', original_call_spread_price) if original_put_spread_price > 0.05: original_put_spread_price = original_put_spread_price - (original_put_spread_price % 0.05) logging.info('Original Put Spread Price: %s', original_put_spread_price) premium_received = original_call_spread_price + original_put_spread_price call_spread_details = { "legs": [{"action": "SELL", "strike": call_spread_entry['strike_short_strike'], "type": "CALL"}, {"action": "BUY", "strike": call_spread_entry['strike_long_strike'], "type": "CALL"}], "open": original_call_spread_price, "high": None, "low": None, "close": None } put_spread_details = { "legs": [{"action": "SELL", "strike": put_spread_entry['strike_short_strike'], "type": "PUT"}, {"action": "BUY", "strike": put_spread_entry['strike_long_strike'], "type": "PUT"}], "open": original_put_spread_price, "high": None, "low": None, "close": None } trades_entered = False call_spread_stopped_out = False put_spread_stopped_out = False took_profit = False took_early_loss = False max_profit = 0.0 max_drawdown = 0.0 exit_time = '16:00:00' for i in range(len(call_spread_history)): call_spread = call_spread_history.iloc[i] put_spread = put_spread_history.iloc[i] if call_spread.name.endswith(entry_time): trades_entered = True continue if not trades_entered: continue if call_spread.name.endswith('16:05:00') or call_spread.name.endswith('16:10:00') or call_spread.name.endswith('16:15:00'): continue if call_spread['high_short_strike'] > call_spread['high_long_strike']: current_call_spread_price = call_spread['high_short_strike'] - call_spread['high_long_strike'] else: current_call_spread_price = ((call_spread['ask_short_strike'] + call_spread['bid_short_strike']) / 2.0) - ((call_spread['ask_long_strike'] + call_spread['bid_long_strike']) / 2.0) if put_spread['high_short_strike'] > put_spread['high_long_strike']: current_put_spread_price = put_spread['high_short_strike'] - put_spread['high_long_strike'] else: current_put_spread_price = ((put_spread['ask_short_strike'] + put_spread['bid_short_strike']) / 2.0) - ((put_spread['ask_long_strike'] + put_spread['bid_long_strike']) / 2.0) call_spread_details['high'] = max(call_spread_details['high'] or float('-inf'), current_call_spread_price) call_spread_details['low'] = min(call_spread_details['low'] or float('inf'), current_call_spread_price) call_spread_details['close'] = current_call_spread_price put_spread_details['high'] = max(put_spread_details['high'] or float('-inf'), current_put_spread_price) put_spread_details['low'] = min(put_spread_details['low'] or float('inf'), current_put_spread_price) put_spread_details['close'] = current_put_spread_price if not call_spread_stopped_out: if current_call_spread_price >= ((call_spread_strategy.stop_loss_multiple + 1) * original_call_spread_price): premium_received -= original_call_spread_price * (call_spread_strategy.stop_loss_multiple + 1) call_spread_details['close'] = original_call_spread_price * (call_spread_strategy.stop_loss_multiple + 1) + 0.10 # Calculate exit slippage. premium_received -= 0.10 # TODO: Make this configurable. call_spread_stopped_out = True exit_time = call_spread.name[-8:] logging.info('Call Spread Stopped Out') if not put_spread_stopped_out: if current_put_spread_price >= ((put_spread_strategy.stop_loss_multiple + 1) * original_put_spread_price): premium_received -= original_put_spread_price * (put_spread_strategy.stop_loss_multiple + 1) premium_received -= 0.10 # TODO: Make this configurable. put_spread_details['close'] = original_put_spread_price * (put_spread_strategy.stop_loss_multiple + 1) + 0.10 put_spread_stopped_out = True exit_time = call_spread.name[-8:] logging.info('Put Spread Stopped Out') if not (call_spread_stopped_out and put_spread_stopped_out): if current_call_spread_price > current_put_spread_price: if put_spread['low_short_strike'] > put_spread['low_long_strike']: current_put_spread_price = put_spread['low_short_strike'] - put_spread['low_long_strike'] else: current_put_spread_price = ((put_spread['ask_short_strike'] + put_spread['bid_short_strike']) / 2.0) - ((put_spread['ask_long_strike'] + put_spread['bid_long_strike']) / 2.0) else: if call_spread['low_short_strike'] > call_spread['low_long_strike']: current_call_spread_price = call_spread['low_short_strike'] - call_spread['low_long_strike'] else: current_call_spread_price = ((call_spread['ask_short_strike'] + call_spread['bid_short_strike']) / 2.0) - ((call_spread['ask_long_strike'] + call_spread['bid_long_strike']) / 2.0) if call_spread_stopped_out: current_call_spread_price = original_call_spread_price * (call_spread_strategy.stop_loss_multiple + 1) if put_spread['high_short_strike'] > put_spread['high_long_strike']: current_put_spread_price = put_spread['high_short_strike'] - put_spread['high_long_strike'] else: current_put_spread_price = ((put_spread['ask_short_strike'] + put_spread['bid_short_strike']) / 2.0) - ((put_spread['ask_long_strike'] + put_spread['bid_long_strike']) / 2.0) if put_spread_stopped_out: current_put_spread_price = original_put_spread_price * (put_spread_strategy.stop_loss_multiple + 1) if call_spread['high_short_strike'] > call_spread['high_long_strike']: current_call_spread_price = call_spread['high_short_strike'] - call_spread['high_long_strike'] else: current_call_spread_price = ((call_spread['ask_short_strike'] + call_spread['bid_short_strike']) / 2.0) - ((call_spread['ask_long_strike'] + call_spread['bid_long_strike']) / 2.0) current_profit = (original_call_spread_price - current_call_spread_price) + (original_put_spread_price - current_put_spread_price) current_profit_dollars = current_profit * call_spread_strategy.number_of_contracts * 100 if current_profit_dollars > max_profit: max_profit = current_profit_dollars if current_profit_dollars < max_drawdown: max_drawdown = current_profit_dollars if not took_profit and not took_early_loss: if not call_spread_stopped_out and current_call_spread_price > 0.05: premium_received -= current_call_spread_price if not put_spread_stopped_out and current_put_spread_price > 0.05: premium_received -= current_put_spread_price number_of_contracts = call_spread_strategy.number_of_contracts stop_out_fees = 0.0 # It costs money to get stopped out. if call_spread_stopped_out: stop_out_fees += (2 * FEES_PER_CONTRACT * number_of_contracts) if put_spread_stopped_out: stop_out_fees += (2 * FEES_PER_CONTRACT * number_of_contracts) fees = 4 * FEES_PER_CONTRACT * number_of_contracts commissions = 4 * number_of_contracts premium_received = (premium_received * number_of_contracts * 100) - (fees + commissions) - stop_out_fees dates.append(current_date) max_drawdowns.append(max_drawdown) max_profits.append(max_profit) wins.append(True if premium_received > 0 else False) exit_times.append(exit_time) result = BacktestResult( date=current_date, entry_time=f'{current_date} {entry_time}', exit_time=f'{current_date} {exit_time}', spreads=[call_spread_details, put_spread_details], trade_entered=True, trade_pnl=premium_received, profit=0.0, # TODO: Calculated elsewhere. Clean this up. credit=original_call_spread_price + original_put_spread_price, mfe=max_profit, mae=max_drawdown ) logging.info('Premium Received: %f', premium_received) return result def backtest_iron_condor( backtest_name: str, call_spread_strategy: OptionSpreadStrategy, put_spread_strategy: OptionSpreadStrategy, start_date: datetime, end_date: datetime ) -> pd.DataFrame: total_premium_received = 0.0 total_trades = 0.0 total_wins = 0.0 start_year = start_date.year end_year = end_date.year futures = [] with ProcessPoolExecutor(max_workers = 10) as executor: for year in range(start_year, end_year + 1): year_directory = os.path.join(OPTION_DATA_DIRECTORY, str(year)) for file in os.listdir(year_directory): historical_data_file = os.path.join(year_directory, file) if os.path.isdir(historical_data_file) or not file.endswith('.csv'): continue # Assuming file format 'YYYY-MM-DD.csv'. file_date_str = os.path.splitext(file)[0] file_date = datetime.strptime(file_date_str, '%Y-%m-%d') # TODO: This doesn't work as expected when the start date is not set to midnight. if file_date < start_date or file_date > end_date: continue logging.info('Processing File: %s', historical_data_file) future = executor.submit(_backtest_iron_condor, historical_data_file, call_spread_strategy, put_spread_strategy) futures.append(future) backtest_results = [] for future in futures: backtest_result = future.result() if backtest_result: total_premium_received += backtest_result.trade_pnl backtest_result.profit = total_premium_received backtest_results.append(backtest_result) if backtest_result.trade_entered: total_trades += 1 if backtest_result.trade_pnl > 0.0: total_wins += 1 logging.info('Overall PnL: %f', total_premium_received) logging.info('Win Rate: %f', (total_wins / total_trades) if total_trades > 0 else 0.0) logging.info('Average Premium Received: %f', (total_premium_received / total_trades) if total_trades > 0 else 0.0) # TODO: Either look up the symbol in the historical option data or have the client provide it. backtest_results = pd.DataFrame([{ 'Date': result.date, 'Symbol': 'SPX', 'Strategy': backtest_name, 'Entry Time': result.entry_time, 'Exit Time': result.exit_time, 'Spreads': result.spreads, 'Profit': result.trade_pnl, 'Cumulative Profit': result.profit } for result in backtest_results]) return backtest_results