options-backtesting/backtesting/backtest_iron_condor.py

421 lines
20 KiB
Python

import logging
import numpy as np
import os
import pandas as pd
import plotly.express as px
from dotenv import load_dotenv
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
load_dotenv()
logging.basicConfig(level=logging.WARN)
FEES_PER_CONTRACT = 0.80
MARKET_CLOSE = '16:00:00'
MARKET_OPEN = '09:35:00'
OPTION_DATA_DIRECTORY = os.getenv('OPTION_DATA_DIRECTORY')
STRIKE_MULTIPLE = 5.0
class OptionType(Enum):
PUT = 'P'
CALL = 'C'
@dataclass
class OptionStrat:
delta_upper_bound: float
delta_lower_bound: float
credit_target: float
max_loss: float
number_of_contracts: int
option_type: OptionType
spread_width: int
stop_loss_multiple: float
stop_loss_percent: float
trade_entry_time: str
@dataclass
class BacktestResult:
date: str
entry_time: str
exit_time: str
trade_entered: bool
trade_pnl: float
profit: float
credit: float
mfe: float
mae: float
# Metrics
dates = []
max_drawdowns = []
max_profits = []
wins = []
exit_times = []
def plot(backtest_results: pd.DataFrame, title: str):
backtest_results.drop('Profit', axis = 1, inplace = True)
backtest_results.rename(columns = {'Cumulative Profit' : 'Profit'}, inplace = True)
# Exclude dates on which the market was closed from being plotted in order to prevent gaps on chart.
start_date = backtest_results['Date'].min()
end_date = backtest_results['Date'].max()
backtest_date_range = pd.date_range(start = start_date, end = end_date).to_list()
backtest_date_range = set([timestamp.strftime('%Y-%m-%d') for timestamp in backtest_date_range])
backtested_dates = set(backtest_results['Date'].to_list())
excluded_dates = backtest_date_range - backtested_dates
backtest_results['Color'] = np.where(backtest_results['Profit'] >= 0, 'limegreen', 'red')
color_sequence = ['limegreen', 'red'] if backtest_results.iloc[0]['Profit'] >= 0 else ['red', 'limegreen']
chart = px.bar(backtest_results, x='Date', y='Profit', title=title, color='Color', color_discrete_sequence=color_sequence, hover_data={'Color': False})
chart.update_layout({
'font_color': '#7a7c7d',
'plot_bgcolor': '#0f0f0f',
'paper_bgcolor': '#0f0f0f',
'title_font_color': '#7a7c7d',
'xaxis': {
'gridcolor': '#0f0f0f',
'zerolinecolor': '#0f0f0f'
},
'yaxis': {
'gridcolor': '#0f0f0f',
'zerolinecolor': '#0f0f0f'
}
})
chart.update_traces(marker_line_width=0, selector=dict(type='bar'))
chart.update_layout(bargap=0, bargroupgap = 0)
chart.update_layout(showlegend=False)
chart.update_xaxes(rangebreaks=[dict(values=list(excluded_dates))])
chart.show()
def get_spread_history_credit(historical_option_data: pd.DataFrame, option_strat: OptionStrat) -> pd.DataFrame:
current_date = historical_option_data.iloc[0]['quote_datetime'][:10]
opening_quotes = historical_option_data[(historical_option_data['quote_datetime'] == (current_date + ' ' + option_strat.trade_entry_time))]
if opening_quotes.empty:
return None
else:
opening_quotes_by_contract_type = opening_quotes[opening_quotes['option_type'] == option_strat.option_type.value]
short_contract_candidates = opening_quotes_by_contract_type[(opening_quotes_by_contract_type['bid'] >= (option_strat.credit_target - 1.0)) & (opening_quotes_by_contract_type['bid'] < (option_strat.credit_target + 1.0))]
credit_increment = 2.00
while short_contract_candidates.empty:
short_contract_candidates = opening_quotes_by_contract_type[(opening_quotes_by_contract_type['bid'] >= (option_strat.credit_target - credit_increment)) & (opening_quotes_by_contract_type['bid'] < (option_strat.credit_target + credit_increment))]
credit_increment += 1.00
strike_candidates = {}
for i in range(len(short_contract_candidates)):
candidate = short_contract_candidates.iloc[i]
strike_candidates[candidate['bid']] = candidate['strike']
closest_bid = min(strike_candidates, key=lambda candidate_bid: abs(option_strat.credit_target - candidate_bid))
short_strike = strike_candidates[closest_bid]
logging.info('Short Strike: %s', short_strike)
long_strike = short_strike + (option_strat.spread_width if option_strat.option_type == OptionType.CALL else -option_strat.spread_width)
# Sometimes the strike we're interested in doesn't exist. Find the nearest one that does.
increment = 5.0 if option_strat.option_type == OptionType.CALL else -5.0
while opening_quotes_by_contract_type[(opening_quotes_by_contract_type['strike'] == long_strike)].empty: # TODO: Doesn't work for one day in 2018.
long_strike = long_strike + increment
logging.info('Long Strike: %s', long_strike)
short_strike_history = historical_option_data[(historical_option_data['strike'] == short_strike) & (historical_option_data['option_type'] == option_strat.option_type.value)].set_index('quote_datetime')
long_strike_history = historical_option_data[(historical_option_data['strike'] == long_strike) & (historical_option_data['option_type'] == option_strat.option_type.value)].set_index('quote_datetime')
spread_history = short_strike_history.join(long_strike_history, lsuffix='_short_strike', rsuffix='_long_strike', on='quote_datetime')
return spread_history
def get_spread_history(historical_option_data: pd.DataFrame, option_strat: OptionStrat) -> pd.DataFrame:
current_date = historical_option_data.iloc[0]['quote_datetime'][:10]
opening_quotes = historical_option_data[(historical_option_data['quote_datetime'] == (current_date + ' ' + option_strat.trade_entry_time))]
if opening_quotes.empty:
return None
else:
if option_strat.option_type == OptionType.PUT:
short_contract_candidates = opening_quotes[(opening_quotes['delta'] > -option_strat.delta_upper_bound) & (opening_quotes['delta'] <= -option_strat.delta_lower_bound)]
else:
short_contract_candidates = opening_quotes[(opening_quotes['delta'] < option_strat.delta_upper_bound) & (opening_quotes['delta'] >= option_strat.delta_lower_bound)]
delta_increment = 0.01
while short_contract_candidates.empty:
if option_strat.option_type == OptionType.PUT:
short_contract_candidates = opening_quotes[(opening_quotes['delta'] > (-option_strat.delta_upper_bound - delta_increment)) & (opening_quotes['delta'] <= -option_strat.delta_lower_bound)]
else:
short_contract_candidates = opening_quotes[(opening_quotes['delta'] < (option_strat.delta_upper_bound + delta_increment)) & (opening_quotes['delta'] >= option_strat.delta_lower_bound)]
delta_increment = delta_increment + 0.01
# Might return more than one, take greatest strike
short_contract = short_contract_candidates.iloc[-1]
short_strike = short_contract['strike']
logging.info('Short Strike: %s', short_strike)
long_strike = short_strike + (option_strat.spread_width if option_strat.option_type == OptionType.CALL else -option_strat.spread_width)
# Sometimes the strike we're interested in doesn't exist. Find the nearest one that does.
increment = 5.0 if option_strat.option_type == OptionType.CALL else -5.0
while opening_quotes[(opening_quotes['strike'] == long_strike)].empty: # TODO: Doesn't work for one day in 2018.
long_strike = long_strike + increment
logging.info('Long Strike: %s', long_strike)
short_strike_history = historical_option_data[(historical_option_data['strike'] == short_strike) & (historical_option_data['option_type'] == option_strat.option_type.value)].set_index('quote_datetime')
long_strike_history = historical_option_data[(historical_option_data['strike'] == long_strike) & (historical_option_data['option_type'] == option_strat.option_type.value)].set_index('quote_datetime')
spread_history = short_strike_history.join(long_strike_history, lsuffix='_short_strike', rsuffix='_long_strike', on='quote_datetime')
return spread_history
def _backtest_iron_condor(
historical_option_data: pd.DataFrame,
call_spread_strat: OptionStrat,
put_spread_strat: OptionStrat
) -> BacktestResult:
call_spread_history = get_spread_history(historical_option_data, call_spread_strat)
put_spread_history = get_spread_history(historical_option_data, put_spread_strat)
current_date = call_spread_history.iloc[0].name[:10]
entry_time = call_spread_strat.trade_entry_time
call_spread_entry = call_spread_history.loc[current_date + ' ' + entry_time]
original_call_spread_price = ((call_spread_entry['ask_short_strike'] + call_spread_entry['bid_short_strike']) / 2.0) - ((call_spread_entry['ask_long_strike'] + call_spread_entry['bid_long_strike']) / 2.0)
put_spread_entry = put_spread_history.loc[current_date + ' ' + entry_time]
original_put_spread_price = ((put_spread_entry['ask_short_strike'] + put_spread_entry['bid_short_strike']) / 2.0) - ((put_spread_entry['ask_long_strike'] + put_spread_entry['bid_long_strike']) / 2.0)
# Calculate entry slippage.
if original_call_spread_price > 0.05:
original_call_spread_price = original_call_spread_price - (original_call_spread_price % 0.05)
logging.info('Original Call Spread Price: %s', original_call_spread_price)
if original_put_spread_price > 0.05:
original_put_spread_price = original_put_spread_price - (original_put_spread_price % 0.05)
logging.info('Original Put Spread Price: %s', original_put_spread_price)
premium_received = original_call_spread_price + original_put_spread_price
trades_entered = False
call_spread_stopped_out = False
put_spread_stopped_out = False
took_profit = False
took_early_loss = False
max_profit = 0.0
max_drawdown = 0.0
exit_time = '16:00:00'
for i in range(len(call_spread_history)):
call_spread = call_spread_history.iloc[i]
put_spread = put_spread_history.iloc[i]
if call_spread.name.endswith(entry_time):
trades_entered = True
continue
if not trades_entered:
continue
if call_spread.name.endswith('16:05:00') or call_spread.name.endswith('16:10:00') or call_spread.name.endswith('16:15:00'):
continue
if call_spread['high_short_strike'] > call_spread['high_long_strike']:
current_call_spread_price = call_spread['high_short_strike'] - call_spread['high_long_strike']
else:
current_call_spread_price = ((call_spread['ask_short_strike'] + call_spread['bid_short_strike']) / 2.0) - ((call_spread['ask_long_strike'] + call_spread['bid_long_strike']) / 2.0)
if put_spread['high_short_strike'] > put_spread['high_long_strike']:
current_put_spread_price = put_spread['high_short_strike'] - put_spread['high_long_strike']
else:
current_put_spread_price = ((put_spread['ask_short_strike'] + put_spread['bid_short_strike']) / 2.0) - ((put_spread['ask_long_strike'] + put_spread['bid_long_strike']) / 2.0)
if not call_spread_stopped_out:
if current_call_spread_price >= ((call_spread_strat.stop_loss_multiple + 1) * original_call_spread_price):
premium_received -= original_call_spread_price * (call_spread_strat.stop_loss_multiple + 1)
# Calculate exit slippage.
premium_received -= 0.10 # TODO: Make this configurable.
call_spread_stopped_out = True
# exit_time = int(call_spread.name[-8:].replace(':', ''))
exit_time = call_spread.name[-8:]
logging.info('Call Spread Stopped Out')
if not put_spread_stopped_out:
if current_put_spread_price >= ((put_spread_strat.stop_loss_multiple + 1) * original_put_spread_price):
premium_received -= original_put_spread_price * (put_spread_strat.stop_loss_multiple + 1)
premium_received -= 0.10 # TODO: Make this configurable.
put_spread_stopped_out = True
# exit_time = int(call_spread.name[-8:].replace(':', ''))
exit_time = call_spread.name[-8:]
logging.info('Put Spread Stopped Out')
if not (call_spread_stopped_out and put_spread_stopped_out):
if current_call_spread_price > current_put_spread_price:
if put_spread['low_short_strike'] > put_spread['low_long_strike']:
current_put_spread_price = put_spread['low_short_strike'] - put_spread['low_long_strike']
else:
current_put_spread_price = ((put_spread['ask_short_strike'] + put_spread['bid_short_strike']) / 2.0) - ((put_spread['ask_long_strike'] + put_spread['bid_long_strike']) / 2.0)
else:
if call_spread['low_short_strike'] > call_spread['low_long_strike']:
current_call_spread_price = call_spread['low_short_strike'] - call_spread['low_long_strike']
else:
current_call_spread_price = ((call_spread['ask_short_strike'] + call_spread['bid_short_strike']) / 2.0) - ((call_spread['ask_long_strike'] + call_spread['bid_long_strike']) / 2.0)
if call_spread_stopped_out:
current_call_spread_price = original_call_spread_price * (call_spread_strat.stop_loss_multiple + 1)
if put_spread['high_short_strike'] > put_spread['high_long_strike']:
current_put_spread_price = put_spread['high_short_strike'] - put_spread['high_long_strike']
else:
current_put_spread_price = ((put_spread['ask_short_strike'] + put_spread['bid_short_strike']) / 2.0) - ((put_spread['ask_long_strike'] + put_spread['bid_long_strike']) / 2.0)
if put_spread_stopped_out:
current_put_spread_price = original_put_spread_price * (put_spread_strat.stop_loss_multiple + 1)
if call_spread['high_short_strike'] > call_spread['high_long_strike']:
current_call_spread_price = call_spread['high_short_strike'] - call_spread['high_long_strike']
else:
current_call_spread_price = ((call_spread['ask_short_strike'] + call_spread['bid_short_strike']) / 2.0) - ((call_spread['ask_long_strike'] + call_spread['bid_long_strike']) / 2.0)
current_profit = (original_call_spread_price - current_call_spread_price) + (original_put_spread_price - current_put_spread_price)
current_profit_dollars = current_profit * call_spread_strat.number_of_contracts * 100
if current_profit_dollars > max_profit:
max_profit = current_profit_dollars
if current_profit_dollars < max_drawdown:
max_drawdown = current_profit_dollars
if not took_profit and not took_early_loss:
if not call_spread_stopped_out and current_call_spread_price > 0.05:
premium_received -= current_call_spread_price
if not put_spread_stopped_out and current_put_spread_price > 0.05:
premium_received -= current_put_spread_price
number_of_contracts = call_spread_strat.number_of_contracts
stop_out_fees = 0.0 # It costs money to get stopped out.
if call_spread_stopped_out:
stop_out_fees += (2 * FEES_PER_CONTRACT * number_of_contracts)
if put_spread_stopped_out:
stop_out_fees += (2 * FEES_PER_CONTRACT * number_of_contracts)
fees = 4 * FEES_PER_CONTRACT * number_of_contracts
commissions = 4 * number_of_contracts
premium_received = (premium_received * number_of_contracts * 100) - (fees + commissions) - stop_out_fees
dates.append(current_date)
max_drawdowns.append(max_drawdown)
max_profits.append(max_profit)
wins.append(True if premium_received > 0 else False)
exit_times.append(exit_time)
result = BacktestResult(
date=current_date,
entry_time=f'{current_date} {entry_time}',
exit_time=f'{current_date} {exit_time}',
trade_entered=True,
trade_pnl=premium_received,
profit=0.0, # TODO: Calculated elsewhere. Clean this up.
credit=original_call_spread_price + original_put_spread_price,
mfe=max_profit,
mae=max_drawdown
)
logging.info('Premium Received: %f', premium_received)
return result
def backtest_iron_condor(
strategy_name: str,
call_spread_strat: OptionStrat,
put_spread_strat: OptionStrat,
start_date: datetime,
end_date: datetime
) -> pd.DataFrame:
total_premium_received = 0.0
total_trades = 0.0
total_wins = 0.0
result_dates = []
result_pnl = []
backtest_results = []
start_year = start_date.year
end_year = end_date.year
for year in range(start_year, end_year + 1):
year_directory = os.path.join(OPTION_DATA_DIRECTORY, str(year))
for file in os.listdir(year_directory):
historical_data_file = os.path.join(year_directory, file)
if os.path.isdir(historical_data_file) or not file.endswith('.csv'):
continue
# Assuming file format 'YYYY-MM-DD.csv'.
file_date_str = os.path.splitext(file)[0]
file_date = datetime.strptime(file_date_str, '%Y-%m-%d')
if file_date < start_date or file_date > end_date:
continue
print('Processing File:', historical_data_file)
logging.info('Processing File: %s', historical_data_file)
historical_option_data = pd.read_csv(historical_data_file)
backtest_result = _backtest_iron_condor(historical_option_data, call_spread_strat, put_spread_strat)
total_premium_received += backtest_result.trade_pnl
backtest_result.profit = total_premium_received
backtest_results.append(backtest_result)
if backtest_result.trade_entered:
total_trades += 1
if backtest_result.trade_pnl > 0.0:
total_wins += 1
logging.info('Overall PnL: %f', total_premium_received)
logging.info('Win Rate: %f', (total_wins / total_trades) if total_trades > 0 else 0.0)
logging.info('Average Premium Received: %f', (total_premium_received / total_trades) if total_trades > 0 else 0.0)
current_date = historical_option_data.iloc[0]['quote_datetime'][:10]
result_dates.append(current_date)
result_pnl.append(total_premium_received)
# TODO: Either look up the symbol in the historical option data or have the client provide it.
backtest_results = pd.DataFrame([{
'Date': result.date,
'Symbol': 'SPX',
'Strategy': strategy_name,
'Entry Time': result.entry_time,
'Exit Time': result.exit_time,
'Profit': result.trade_pnl,
'Cumulative Profit': result.profit
} for result in backtest_results])
return backtest_results
def create_strategies(entry_time, number_of_contracts=1):
call_spread_strat = OptionStrat(
delta_upper_bound=0.11,
delta_lower_bound=0.10,
credit_target=1.50,
max_loss=5000,
number_of_contracts=number_of_contracts,
option_type=OptionType.CALL,
spread_width=50,
stop_loss_multiple=1.00,
stop_loss_percent=1.0,
trade_entry_time=entry_time
)
put_spread_strat = OptionStrat(
delta_upper_bound=0.11,
delta_lower_bound=0.10,
credit_target=1.50,
max_loss=5000,
number_of_contracts=number_of_contracts,
option_type=OptionType.PUT,
spread_width=50,
stop_loss_multiple=1.00,
stop_loss_percent=1.0,
trade_entry_time=entry_time
)
return call_spread_strat, put_spread_strat
if __name__ == '__main__':
start_date = datetime(2024, 1, 12)
end_date = datetime.now()
call_spread_strat, put_spread_strat = create_strategies(entry_time = '10:05:00')
backtest_result = backtest_iron_condor(
f'Iron Condor @ {call_spread_strat.trade_entry_time}',
call_spread_strat,
put_spread_strat,
start_date,
end_date
)
print(backtest_result)
plot(backtest_result, title = 'Iron Condor Backtest Results')