From 94d05f060fed819b27976798204179bde49dbfd2 Mon Sep 17 00:00:00 2001 From: moshferatu Date: Thu, 28 Dec 2023 07:48:14 -0800 Subject: [PATCH] Add script for backtesting iron condors based on entry time and target delta or credit --- backtest_iron_condor.py | 396 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 396 insertions(+) create mode 100644 backtest_iron_condor.py diff --git a/backtest_iron_condor.py b/backtest_iron_condor.py new file mode 100644 index 0000000..7b02fb1 --- /dev/null +++ b/backtest_iron_condor.py @@ -0,0 +1,396 @@ +import logging +import numpy as np +import os +import pandas as pd +import plotly.express as px + +from dotenv import load_dotenv +from dataclasses import dataclass +from datetime import datetime, timedelta +from enum import Enum +from os import getenv + +load_dotenv() + +logging.basicConfig(level=logging.WARN) + +FEES_PER_CONTRACT = 0.80 +MARKET_CLOSE = '16:00:00' +MARKET_OPEN = '09:35:00' +OPTION_DATA_DIRECTORY = os.getenv('OPTION_DATA_DIRECTORY') +STRIKE_MULTIPLE = 5.0 + +class OptionType(Enum): + PUT = 'P' + CALL = 'C' + +@dataclass +class OptionStrat: + delta_upper_bound: float + delta_lower_bound: float + credit_target: float + max_loss: float + number_of_contracts: int + option_type: OptionType + spread_width: int + stop_loss_multiple: float + stop_loss_percent: float + trade_entry_time: str + +@dataclass +class BacktestResult: + date: str + trade_entered: bool + trade_pnl: float + profit: float + credit: float + mfe: float + mae: float + +# Metrics +dates = [] +max_drawdowns = [] +max_profits = [] +wins = [] +exit_times = [] + +def plot(backtest_results: pd.DataFrame, title: str): + backtest_results.rename(columns = {'date' : 'Date', 'profit' : 'Profit'}, inplace = True) + + # Exclude dates on which the market was closed from being plotted in order to prevent gaps on chart. + backtest_date_range = pd.date_range(start="2016-01-01", end="2023-12-31").to_list() + backtest_date_range = set([timestamp.strftime('%Y-%m-%d') for timestamp in backtest_date_range]) + backtested_dates = set(backtest_results['Date'].to_list()) + excluded_dates = backtest_date_range - backtested_dates + + backtest_results['Color'] = np.where(backtest_results['Profit'] >= 0, 'limegreen', 'red') + color_sequence = ['limegreen', 'red'] if backtest_results.iloc[0]['Profit'] >= 0 else ['red', 'limegreen'] + + print('Backtest Results:') + print(backtest_results) + + chart = px.bar(backtest_results, x='Date', y='Profit', title=title, color='Color', color_discrete_sequence=color_sequence, hover_data={'Color': False}) + chart.update_layout({ + 'font_color': '#7a7c7d', + 'plot_bgcolor': '#0f0f0f', + 'paper_bgcolor': '#0f0f0f', + 'title_font_color': '#7a7c7d', + 'xaxis': { + 'gridcolor': '#0f0f0f', + 'zerolinecolor': '#0f0f0f' + }, + 'yaxis': { + 'gridcolor': '#0f0f0f', + 'zerolinecolor': '#0f0f0f' + } + }) + chart.update_traces(marker_line_width=0, selector=dict(type='bar')) + chart.update_layout(bargap=0, bargroupgap = 0) + chart.update_layout(showlegend=False) + chart.update_xaxes(rangebreaks=[dict(values=list(excluded_dates))]) + chart.show() + +def get_spread_history_credit(historical_option_data: pd.DataFrame, option_strat: OptionStrat) -> pd.DataFrame: + current_date = historical_option_data.iloc[0]['quote_datetime'][:10] + opening_quotes = historical_option_data[(historical_option_data['quote_datetime'] == (current_date + ' ' + option_strat.trade_entry_time))] + if opening_quotes.empty: + return None + else: + opening_quotes_by_contract_type = opening_quotes[opening_quotes['option_type'] == option_strat.option_type.value] + short_contract_candidates = opening_quotes_by_contract_type[(opening_quotes_by_contract_type['bid'] >= (option_strat.credit_target - 1.0)) & (opening_quotes_by_contract_type['bid'] < (option_strat.credit_target + 1.0))] + credit_increment = 2.00 + while short_contract_candidates.empty: + short_contract_candidates = opening_quotes_by_contract_type[(opening_quotes_by_contract_type['bid'] >= (option_strat.credit_target - credit_increment)) & (opening_quotes_by_contract_type['bid'] < (option_strat.credit_target + credit_increment))] + credit_increment += 1.00 + + strike_candidates = {} + for i in range(len(short_contract_candidates)): + candidate = short_contract_candidates.iloc[i] + strike_candidates[candidate['bid']] = candidate['strike'] + + closest_bid = min(strike_candidates, key=lambda candidate_bid: abs(option_strat.credit_target - candidate_bid)) + short_strike = strike_candidates[closest_bid] + logging.info('Short Strike: %s', short_strike) + + long_strike = short_strike + (option_strat.spread_width if option_strat.option_type == OptionType.CALL else -option_strat.spread_width) + # Sometimes the strike we're interested in doesn't exist. Find the nearest one that does. + increment = 5.0 if option_strat.option_type == OptionType.CALL else -5.0 + while opening_quotes_by_contract_type[(opening_quotes_by_contract_type['strike'] == long_strike)].empty: # TODO: Doesn't work for one day in 2018. + long_strike = long_strike + increment + logging.info('Long Strike: %s', long_strike) + + short_strike_history = historical_option_data[(historical_option_data['strike'] == short_strike) & (historical_option_data['option_type'] == option_strat.option_type.value)].set_index('quote_datetime') + long_strike_history = historical_option_data[(historical_option_data['strike'] == long_strike) & (historical_option_data['option_type'] == option_strat.option_type.value)].set_index('quote_datetime') + spread_history = short_strike_history.join(long_strike_history, lsuffix='_short_strike', rsuffix='_long_strike', on='quote_datetime') + return spread_history + +def get_spread_history(historical_option_data: pd.DataFrame, option_strat: OptionStrat) -> pd.DataFrame: + current_date = historical_option_data.iloc[0]['quote_datetime'][:10] + opening_quotes = historical_option_data[(historical_option_data['quote_datetime'] == (current_date + ' ' + option_strat.trade_entry_time))] + if opening_quotes.empty: + return None + else: + if option_strat.option_type == OptionType.PUT: + short_contract_candidates = opening_quotes[(opening_quotes['delta'] > -option_strat.delta_upper_bound) & (opening_quotes['delta'] <= -option_strat.delta_lower_bound)] + else: + short_contract_candidates = opening_quotes[(opening_quotes['delta'] < option_strat.delta_upper_bound) & (opening_quotes['delta'] >= option_strat.delta_lower_bound)] + delta_increment = 0.01 + while short_contract_candidates.empty: + if option_strat.option_type == OptionType.PUT: + short_contract_candidates = opening_quotes[(opening_quotes['delta'] > (-option_strat.delta_upper_bound - delta_increment)) & (opening_quotes['delta'] <= -option_strat.delta_lower_bound)] + else: + short_contract_candidates = opening_quotes[(opening_quotes['delta'] < (option_strat.delta_upper_bound + delta_increment)) & (opening_quotes['delta'] >= option_strat.delta_lower_bound)] + delta_increment = delta_increment + 0.01 + # Might return more than one, take greatest strike + short_contract = short_contract_candidates.iloc[-1] + + short_strike = short_contract['strike'] + logging.info('Short Strike: %s', short_strike) + + long_strike = short_strike + (option_strat.spread_width if option_strat.option_type == OptionType.CALL else -option_strat.spread_width) + # Sometimes the strike we're interested in doesn't exist. Find the nearest one that does. + increment = 5.0 if option_strat.option_type == OptionType.CALL else -5.0 + while opening_quotes[(opening_quotes['strike'] == long_strike)].empty: # TODO: Doesn't work for one day in 2018. + long_strike = long_strike + increment + logging.info('Long Strike: %s', long_strike) + + short_strike_history = historical_option_data[(historical_option_data['strike'] == short_strike) & (historical_option_data['option_type'] == option_strat.option_type.value)].set_index('quote_datetime') + long_strike_history = historical_option_data[(historical_option_data['strike'] == long_strike) & (historical_option_data['option_type'] == option_strat.option_type.value)].set_index('quote_datetime') + spread_history = short_strike_history.join(long_strike_history, lsuffix='_short_strike', rsuffix='_long_strike', on='quote_datetime') + return spread_history + +def _backtest_iron_condor( + historical_option_data: pd.DataFrame, + call_spread_strat: OptionStrat, + put_spread_strat: OptionStrat + ) -> BacktestResult: + + call_spread_history = get_spread_history(historical_option_data, call_spread_strat) + put_spread_history = get_spread_history(historical_option_data, put_spread_strat) + + current_date = call_spread_history.iloc[0].name[:10] + + call_spread_entry = call_spread_history.loc[current_date + ' ' + call_spread_strat.trade_entry_time] + original_call_spread_price = ((call_spread_entry['ask_short_strike'] + call_spread_entry['bid_short_strike']) / 2.0) - ((call_spread_entry['ask_long_strike'] + call_spread_entry['bid_long_strike']) / 2.0) + + put_spread_entry = put_spread_history.loc[current_date + ' ' + put_spread_strat.trade_entry_time] + original_put_spread_price = ((put_spread_entry['ask_short_strike'] + put_spread_entry['bid_short_strike']) / 2.0) - ((put_spread_entry['ask_long_strike'] + put_spread_entry['bid_long_strike']) / 2.0) + + # Calculate entry slippage. + if original_call_spread_price > 0.05: + original_call_spread_price = original_call_spread_price - (original_call_spread_price % 0.05) + logging.info('Original Call Spread Price: %s', original_call_spread_price) + + if original_put_spread_price > 0.05: + original_put_spread_price = original_put_spread_price - (original_put_spread_price % 0.05) + logging.info('Original Put Spread Price: %s', original_put_spread_price) + + premium_received = original_call_spread_price + original_put_spread_price + + trades_entered = False + call_spread_stopped_out = False + put_spread_stopped_out = False + took_profit = False + took_early_loss = False + + max_profit = 0.0 + max_drawdown = 0.0 + exit_time = 160000 + + for i in range(len(call_spread_history)): + call_spread = call_spread_history.iloc[i] + put_spread = put_spread_history.iloc[i] + + if call_spread.name.endswith(call_spread_strat.trade_entry_time): + trades_entered = True + continue + + if not trades_entered: + continue + + if call_spread.name.endswith('16:05:00') or call_spread.name.endswith('16:10:00') or call_spread.name.endswith('16:15:00'): + continue + + if call_spread['high_short_strike'] > call_spread['high_long_strike']: + current_call_spread_price = call_spread['high_short_strike'] - call_spread['high_long_strike'] + else: + current_call_spread_price = ((call_spread['ask_short_strike'] + call_spread['bid_short_strike']) / 2.0) - ((call_spread['ask_long_strike'] + call_spread['bid_long_strike']) / 2.0) + if put_spread['high_short_strike'] > put_spread['high_long_strike']: + current_put_spread_price = put_spread['high_short_strike'] - put_spread['high_long_strike'] + else: + current_put_spread_price = ((put_spread['ask_short_strike'] + put_spread['bid_short_strike']) / 2.0) - ((put_spread['ask_long_strike'] + put_spread['bid_long_strike']) / 2.0) + + if not call_spread_stopped_out: + if current_call_spread_price >= ((call_spread_strat.stop_loss_multiple + 1) * original_call_spread_price): + premium_received -= original_call_spread_price * (call_spread_strat.stop_loss_multiple + 1) + # Calculate exit slippage. + premium_received -= 0.10 # TODO: Make this configurable. + call_spread_stopped_out = True + exit_time = int(call_spread.name[-8:].replace(':', '')) + logging.info('Call Spread Stopped Out') + + if not put_spread_stopped_out: + if current_put_spread_price >= ((put_spread_strat.stop_loss_multiple + 1) * original_put_spread_price): + premium_received -= original_put_spread_price * (put_spread_strat.stop_loss_multiple + 1) + premium_received -= 0.10 # TODO: Make this configurable. + put_spread_stopped_out = True + exit_time = int(call_spread.name[-8:].replace(':', '')) + logging.info('Put Spread Stopped Out') + + if not (call_spread_stopped_out and put_spread_stopped_out): + if current_call_spread_price > current_put_spread_price: + if put_spread['low_short_strike'] > put_spread['low_long_strike']: + current_put_spread_price = put_spread['low_short_strike'] - put_spread['low_long_strike'] + else: + current_put_spread_price = ((put_spread['ask_short_strike'] + put_spread['bid_short_strike']) / 2.0) - ((put_spread['ask_long_strike'] + put_spread['bid_long_strike']) / 2.0) + else: + if call_spread['low_short_strike'] > call_spread['low_long_strike']: + current_call_spread_price = call_spread['low_short_strike'] - call_spread['low_long_strike'] + else: + current_call_spread_price = ((call_spread['ask_short_strike'] + call_spread['bid_short_strike']) / 2.0) - ((call_spread['ask_long_strike'] + call_spread['bid_long_strike']) / 2.0) + if call_spread_stopped_out: + current_call_spread_price = original_call_spread_price * (call_spread_strat.stop_loss_multiple + 1) + if put_spread['high_short_strike'] > put_spread['high_long_strike']: + current_put_spread_price = put_spread['high_short_strike'] - put_spread['high_long_strike'] + else: + current_put_spread_price = ((put_spread['ask_short_strike'] + put_spread['bid_short_strike']) / 2.0) - ((put_spread['ask_long_strike'] + put_spread['bid_long_strike']) / 2.0) + if put_spread_stopped_out: + current_put_spread_price = original_put_spread_price * (put_spread_strat.stop_loss_multiple + 1) + if call_spread['high_short_strike'] > call_spread['high_long_strike']: + current_call_spread_price = call_spread['high_short_strike'] - call_spread['high_long_strike'] + else: + current_call_spread_price = ((call_spread['ask_short_strike'] + call_spread['bid_short_strike']) / 2.0) - ((call_spread['ask_long_strike'] + call_spread['bid_long_strike']) / 2.0) + current_profit = (original_call_spread_price - current_call_spread_price) + (original_put_spread_price - current_put_spread_price) + current_profit_dollars = current_profit * call_spread_strat.number_of_contracts * 100 + if current_profit_dollars > max_profit: + max_profit = current_profit_dollars + if current_profit_dollars < max_drawdown: + max_drawdown = current_profit_dollars + + if not took_profit and not took_early_loss: + if not call_spread_stopped_out and current_call_spread_price > 0.05: + premium_received -= current_call_spread_price + + if not put_spread_stopped_out and current_put_spread_price > 0.05: + premium_received -= current_put_spread_price + + number_of_contracts = call_spread_strat.number_of_contracts + stop_out_fees = 0.0 # It costs money to get stopped out. + if call_spread_stopped_out: + stop_out_fees += (2 * FEES_PER_CONTRACT * number_of_contracts) + if put_spread_stopped_out: + stop_out_fees += (2 * FEES_PER_CONTRACT * number_of_contracts) + fees = 4 * FEES_PER_CONTRACT * number_of_contracts + commissions = 4 * number_of_contracts + premium_received = (premium_received * number_of_contracts * 100) - (fees + commissions) - stop_out_fees + + dates.append(current_date) + max_drawdowns.append(max_drawdown) + max_profits.append(max_profit) + wins.append(True if premium_received > 0 else False) + exit_times.append(exit_time) + + result = BacktestResult( + date = current_date, + trade_entered = True, + trade_pnl = premium_received, + profit = 0.0, + credit = original_call_spread_price + original_put_spread_price, + mfe = max_profit, + mae = max_drawdown + ) + + logging.info('Premium Received: %f', premium_received) + return result + +def backtest_iron_condor( + call_spread_strat: OptionStrat, + put_spread_strat: OptionStrat, + start_date: datetime, + end_date: datetime + ) -> pd.DataFrame: + + total_premium_received = 0.0 + total_trades = 0.0 + total_wins = 0.0 + + result_dates = [] + result_pnl = [] + + results = [] + + start_year = start_date.year + end_year = end_date.year + for year in range(start_year, end_year + 1): + year_directory = os.path.join(OPTION_DATA_DIRECTORY, str(year)) + + for file in os.listdir(year_directory): + historical_data_file = os.path.join(year_directory, file) + if os.path.isdir(historical_data_file) or not file.endswith('.csv'): + continue + + # Assuming file format 'YYYY-MM-DD.csv'. + file_date_str = os.path.splitext(file)[0] + file_date = datetime.strptime(file_date_str, '%Y-%m-%d') + + if file_date < start_date or file_date > end_date: + continue + + print('Processing File:', historical_data_file) + logging.info('Processing File: %s', historical_data_file) + + historical_option_data = pd.read_csv(historical_data_file) + backtest_result = _backtest_iron_condor(historical_option_data, call_spread_strat, put_spread_strat) + total_premium_received += backtest_result.trade_pnl + backtest_result.profit = total_premium_received + results.append(backtest_result) + + if backtest_result.trade_entered: + total_trades += 1 + if backtest_result.trade_pnl > 0.0: + total_wins += 1 + + logging.info('Overall PnL: %f', total_premium_received) + logging.info('Win Rate: %f', (total_wins / total_trades) if total_trades > 0 else 0.0) + logging.info('Average Premium Received: %f', (total_premium_received / total_trades) if total_trades > 0 else 0.0) + + current_date = historical_option_data.iloc[0]['quote_datetime'][:10] + result_dates.append(current_date) + result_pnl.append(total_premium_received) + + backtest_results = pd.DataFrame([result.__dict__ for result in results]) + return backtest_results + +def create_strategies(entry_time, number_of_contracts=1): + call_spread_strat = OptionStrat( + delta_upper_bound=0.11, + delta_lower_bound=0.10, + credit_target=1.50, + max_loss=5000, + number_of_contracts=number_of_contracts, + option_type=OptionType.CALL, + spread_width=50, + stop_loss_multiple=1.00, + stop_loss_percent=1.0, + trade_entry_time=entry_time + ) + put_spread_strat = OptionStrat( + delta_upper_bound=0.11, + delta_lower_bound=0.10, + credit_target=1.50, + max_loss=5000, + number_of_contracts=number_of_contracts, + option_type=OptionType.PUT, + spread_width=50, + stop_loss_multiple=1.00, + stop_loss_percent=1.0, + trade_entry_time=entry_time + ) + return call_spread_strat, put_spread_strat + +if __name__ == '__main__': + start_date = datetime(2023, 1, 1) + end_date = datetime.now() + call_spread_strat, put_spread_strat = create_strategies(entry_time = '10:05:00') + backtest_result = backtest_iron_condor(call_spread_strat, put_spread_strat, start_date, end_date) + plot(backtest_result, title = 'Iron Condor Backtest Results') \ No newline at end of file