355 lines
18 KiB
Python
355 lines
18 KiB
Python
import logging
|
|
import os
|
|
import pandas as pd
|
|
|
|
from concurrent.futures import ProcessPoolExecutor
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from dotenv import load_dotenv
|
|
|
|
from .credit_target_strategy import CreditTargetStrategy
|
|
from .delta_target_strategy import DeltaTargetStrategy
|
|
from .option_spread_strategy import OptionSpreadStrategy
|
|
from .option_type import OptionType
|
|
|
|
load_dotenv()
|
|
|
|
logging.basicConfig(level=logging.WARN)
|
|
|
|
FEES_PER_CONTRACT = 0.80
|
|
MARKET_CLOSE = '16:00:00'
|
|
MARKET_OPEN = '09:35:00'
|
|
OPTION_DATA_DIRECTORY = os.getenv('OPTION_DATA_DIRECTORY')
|
|
STRIKE_MULTIPLE = 5.0
|
|
|
|
@dataclass
|
|
class BacktestResult:
|
|
date: str
|
|
entry_time: str
|
|
exit_time: str
|
|
spreads: list
|
|
trade_entered: bool
|
|
trade_pnl: float
|
|
profit: float
|
|
credit: float
|
|
mfe: float
|
|
mae: float
|
|
|
|
# Metrics
|
|
dates = []
|
|
max_drawdowns = []
|
|
max_profits = []
|
|
wins = []
|
|
exit_times = []
|
|
|
|
def get_spread_history_credit(historical_option_data: pd.DataFrame, option_strat: CreditTargetStrategy) -> pd.DataFrame:
|
|
current_date = historical_option_data.iloc[0]['quote_datetime'][:10]
|
|
opening_quotes = historical_option_data[(historical_option_data['quote_datetime'] == (current_date + ' ' + option_strat.trade_entry_time))].copy()
|
|
if opening_quotes.empty:
|
|
return None
|
|
else:
|
|
opening_quotes_by_contract_type = opening_quotes[opening_quotes['option_type'] == option_strat.option_type.value].copy()
|
|
|
|
opening_quotes_by_contract_type['credit_diff'] = (opening_quotes_by_contract_type['bid'] - option_strat.credit_target).abs()
|
|
short_contract = opening_quotes_by_contract_type.loc[opening_quotes_by_contract_type['credit_diff'].idxmin()]
|
|
|
|
short_strike = short_contract['strike']
|
|
logging.info('Short Strike: %s', short_strike)
|
|
|
|
long_strike = short_strike + (option_strat.spread_width if option_strat.option_type == OptionType.CALL else -option_strat.spread_width)
|
|
# Sometimes the strike we're interested in doesn't exist. Find the nearest one that does.
|
|
increment = 5.0 if option_strat.option_type == OptionType.CALL else -5.0
|
|
while opening_quotes_by_contract_type[(opening_quotes_by_contract_type['strike'] == long_strike)].empty: # TODO: Doesn't work for one day in 2018.
|
|
long_strike = long_strike + increment
|
|
logging.info('Long Strike: %s', long_strike)
|
|
|
|
short_strike_history = historical_option_data[(historical_option_data['strike'] == short_strike) & (historical_option_data['option_type'] == option_strat.option_type.value)].set_index('quote_datetime')
|
|
long_strike_history = historical_option_data[(historical_option_data['strike'] == long_strike) & (historical_option_data['option_type'] == option_strat.option_type.value)].set_index('quote_datetime')
|
|
spread_history = short_strike_history.join(long_strike_history, lsuffix='_short_strike', rsuffix='_long_strike', on='quote_datetime')
|
|
return spread_history
|
|
|
|
def get_spread_history(historical_option_data: pd.DataFrame, option_strat: DeltaTargetStrategy) -> pd.DataFrame:
|
|
current_date = historical_option_data.iloc[0]['quote_datetime'][:10]
|
|
opening_quotes = historical_option_data[(historical_option_data['quote_datetime'] == (current_date + ' ' + option_strat.trade_entry_time))].copy()
|
|
if opening_quotes.empty:
|
|
return None
|
|
else:
|
|
if option_strat.option_type == OptionType.PUT:
|
|
opening_quotes['delta_diff'] = (opening_quotes['delta'] + option_strat.delta_upper_bound).abs()
|
|
short_contract = opening_quotes.loc[opening_quotes['delta_diff'].idxmin()]
|
|
else:
|
|
opening_quotes['delta_diff'] = (opening_quotes['delta'] - option_strat.delta_upper_bound).abs()
|
|
short_contract = opening_quotes.loc[opening_quotes['delta_diff'].idxmin()]
|
|
|
|
short_strike = short_contract['strike']
|
|
logging.info('Short Strike: %s', short_strike)
|
|
|
|
long_strike = short_strike + (option_strat.spread_width if option_strat.option_type == OptionType.CALL else -option_strat.spread_width)
|
|
# Sometimes the strike we're interested in doesn't exist. Find the nearest one that does.
|
|
increment = 5.0 if option_strat.option_type == OptionType.CALL else -5.0
|
|
while opening_quotes[(opening_quotes['strike'] == long_strike)].empty: # TODO: Doesn't work for one day in 2018.
|
|
long_strike = long_strike + increment
|
|
logging.info('Long Strike: %s', long_strike)
|
|
|
|
short_strike_history = historical_option_data[(historical_option_data['strike'] == short_strike) & (historical_option_data['option_type'] == option_strat.option_type.value)].set_index('quote_datetime')
|
|
long_strike_history = historical_option_data[(historical_option_data['strike'] == long_strike) & (historical_option_data['option_type'] == option_strat.option_type.value)].set_index('quote_datetime')
|
|
spread_history = short_strike_history.join(long_strike_history, lsuffix='_short_strike', rsuffix='_long_strike', on='quote_datetime')
|
|
return spread_history
|
|
|
|
def _backtest_iron_condor(
|
|
historical_data_file: str,
|
|
call_spread_strategy: OptionSpreadStrategy,
|
|
put_spread_strategy: OptionSpreadStrategy
|
|
) -> BacktestResult:
|
|
|
|
print('Processing File:', historical_data_file)
|
|
historical_option_data = pd.read_csv(historical_data_file)
|
|
|
|
if isinstance(call_spread_strategy, CreditTargetStrategy):
|
|
call_spread_history = get_spread_history_credit(historical_option_data, call_spread_strategy)
|
|
put_spread_history = get_spread_history_credit(historical_option_data, put_spread_strategy)
|
|
else:
|
|
call_spread_history = get_spread_history(historical_option_data, call_spread_strategy)
|
|
put_spread_history = get_spread_history(historical_option_data, put_spread_strategy)
|
|
|
|
entry_time = call_spread_strategy.trade_entry_time
|
|
|
|
if call_spread_history is None or put_spread_history is None:
|
|
# This can happen when the market closes early for the day.
|
|
logging.warn('No spread history found in %s for %s', historical_data_file, entry_time)
|
|
return None
|
|
|
|
current_date = call_spread_history.iloc[0].name[:10]
|
|
|
|
call_spread_entry = call_spread_history.loc[current_date + ' ' + entry_time]
|
|
original_call_spread_price = ((call_spread_entry['ask_short_strike'] + call_spread_entry['bid_short_strike']) / 2.0) - ((call_spread_entry['ask_long_strike'] + call_spread_entry['bid_long_strike']) / 2.0)
|
|
|
|
put_spread_entry = put_spread_history.loc[current_date + ' ' + entry_time]
|
|
original_put_spread_price = ((put_spread_entry['ask_short_strike'] + put_spread_entry['bid_short_strike']) / 2.0) - ((put_spread_entry['ask_long_strike'] + put_spread_entry['bid_long_strike']) / 2.0)
|
|
|
|
# Calculate entry slippage.
|
|
if original_call_spread_price > 0.05:
|
|
original_call_spread_price = original_call_spread_price - (original_call_spread_price % 0.05)
|
|
logging.info('Original Call Spread Price: %s', original_call_spread_price)
|
|
|
|
if original_put_spread_price > 0.05:
|
|
original_put_spread_price = original_put_spread_price - (original_put_spread_price % 0.05)
|
|
logging.info('Original Put Spread Price: %s', original_put_spread_price)
|
|
|
|
premium_received = original_call_spread_price + original_put_spread_price
|
|
|
|
call_spread_details = {
|
|
"legs": [{"action": "SELL", "strike": call_spread_entry['strike_short_strike'], "type": "CALL"},
|
|
{"action": "BUY", "strike": call_spread_entry['strike_long_strike'], "type": "CALL"}],
|
|
"open": original_call_spread_price,
|
|
"high": None,
|
|
"low": None,
|
|
"close": None
|
|
}
|
|
|
|
put_spread_details = {
|
|
"legs": [{"action": "SELL", "strike": put_spread_entry['strike_short_strike'], "type": "PUT"},
|
|
{"action": "BUY", "strike": put_spread_entry['strike_long_strike'], "type": "PUT"}],
|
|
"open": original_put_spread_price,
|
|
"high": None,
|
|
"low": None,
|
|
"close": None
|
|
}
|
|
|
|
trades_entered = False
|
|
call_spread_stopped_out = False
|
|
put_spread_stopped_out = False
|
|
|
|
max_profit = 0.0
|
|
max_drawdown = 0.0
|
|
exit_time = '16:00:00'
|
|
|
|
current_call_spread_price = original_call_spread_price
|
|
current_put_spread_price = original_put_spread_price
|
|
|
|
for i in range(len(call_spread_history)):
|
|
call_spread = call_spread_history.iloc[i]
|
|
put_spread = put_spread_history.iloc[i]
|
|
|
|
if call_spread.name.endswith(entry_time):
|
|
trades_entered = True
|
|
continue
|
|
|
|
if not trades_entered:
|
|
continue
|
|
|
|
if call_spread.name.endswith('16:05:00') or call_spread.name.endswith('16:10:00') or call_spread.name.endswith('16:15:00'):
|
|
continue
|
|
|
|
if call_spread['high_short_strike'] > call_spread['high_long_strike']:
|
|
current_call_spread_price = call_spread['high_short_strike'] - call_spread['high_long_strike']
|
|
else:
|
|
current_call_spread_price = ((call_spread['ask_short_strike'] + call_spread['bid_short_strike']) / 2.0) - ((call_spread['ask_long_strike'] + call_spread['bid_long_strike']) / 2.0)
|
|
if put_spread['high_short_strike'] > put_spread['high_long_strike']:
|
|
current_put_spread_price = put_spread['high_short_strike'] - put_spread['high_long_strike']
|
|
else:
|
|
current_put_spread_price = ((put_spread['ask_short_strike'] + put_spread['bid_short_strike']) / 2.0) - ((put_spread['ask_long_strike'] + put_spread['bid_long_strike']) / 2.0)
|
|
|
|
call_spread_details['high'] = max(call_spread_details['high'] or float('-inf'), current_call_spread_price)
|
|
call_spread_details['low'] = min(call_spread_details['low'] or float('inf'), current_call_spread_price)
|
|
call_spread_details['close'] = current_call_spread_price
|
|
|
|
put_spread_details['high'] = max(put_spread_details['high'] or float('-inf'), current_put_spread_price)
|
|
put_spread_details['low'] = min(put_spread_details['low'] or float('inf'), current_put_spread_price)
|
|
put_spread_details['close'] = current_put_spread_price
|
|
|
|
if not call_spread_stopped_out:
|
|
if current_call_spread_price >= ((call_spread_strategy.stop_loss_multiple + 1) * original_call_spread_price):
|
|
premium_received -= original_call_spread_price * (call_spread_strategy.stop_loss_multiple + 1)
|
|
call_spread_details['close'] = original_call_spread_price * (call_spread_strategy.stop_loss_multiple + 1) + 0.10
|
|
# Calculate exit slippage.
|
|
premium_received -= 0.10 # TODO: Make this configurable.
|
|
call_spread_stopped_out = True
|
|
exit_time = call_spread.name[-8:]
|
|
logging.info('Call Spread Stopped Out')
|
|
|
|
if not put_spread_stopped_out:
|
|
if current_put_spread_price >= ((put_spread_strategy.stop_loss_multiple + 1) * original_put_spread_price):
|
|
premium_received -= original_put_spread_price * (put_spread_strategy.stop_loss_multiple + 1)
|
|
premium_received -= 0.10 # TODO: Make this configurable.
|
|
put_spread_details['close'] = original_put_spread_price * (put_spread_strategy.stop_loss_multiple + 1) + 0.10
|
|
put_spread_stopped_out = True
|
|
exit_time = call_spread.name[-8:]
|
|
logging.info('Put Spread Stopped Out')
|
|
|
|
if not (call_spread_stopped_out and put_spread_stopped_out):
|
|
if current_call_spread_price > current_put_spread_price:
|
|
if put_spread['low_short_strike'] > put_spread['low_long_strike']:
|
|
current_put_spread_price = put_spread['low_short_strike'] - put_spread['low_long_strike']
|
|
else:
|
|
current_put_spread_price = ((put_spread['ask_short_strike'] + put_spread['bid_short_strike']) / 2.0) - ((put_spread['ask_long_strike'] + put_spread['bid_long_strike']) / 2.0)
|
|
else:
|
|
if call_spread['low_short_strike'] > call_spread['low_long_strike']:
|
|
current_call_spread_price = call_spread['low_short_strike'] - call_spread['low_long_strike']
|
|
else:
|
|
current_call_spread_price = ((call_spread['ask_short_strike'] + call_spread['bid_short_strike']) / 2.0) - ((call_spread['ask_long_strike'] + call_spread['bid_long_strike']) / 2.0)
|
|
if call_spread_stopped_out:
|
|
current_call_spread_price = original_call_spread_price * (call_spread_strategy.stop_loss_multiple + 1)
|
|
if put_spread['high_short_strike'] > put_spread['high_long_strike']:
|
|
current_put_spread_price = put_spread['high_short_strike'] - put_spread['high_long_strike']
|
|
else:
|
|
current_put_spread_price = ((put_spread['ask_short_strike'] + put_spread['bid_short_strike']) / 2.0) - ((put_spread['ask_long_strike'] + put_spread['bid_long_strike']) / 2.0)
|
|
if put_spread_stopped_out:
|
|
current_put_spread_price = original_put_spread_price * (put_spread_strategy.stop_loss_multiple + 1)
|
|
if call_spread['high_short_strike'] > call_spread['high_long_strike']:
|
|
current_call_spread_price = call_spread['high_short_strike'] - call_spread['high_long_strike']
|
|
else:
|
|
current_call_spread_price = ((call_spread['ask_short_strike'] + call_spread['bid_short_strike']) / 2.0) - ((call_spread['ask_long_strike'] + call_spread['bid_long_strike']) / 2.0)
|
|
current_profit = (original_call_spread_price - current_call_spread_price) + (original_put_spread_price - current_put_spread_price)
|
|
current_profit_dollars = current_profit * call_spread_strategy.number_of_contracts * 100
|
|
if current_profit_dollars > max_profit:
|
|
max_profit = current_profit_dollars
|
|
if current_profit_dollars < max_drawdown:
|
|
max_drawdown = current_profit_dollars
|
|
|
|
if not call_spread_stopped_out and current_call_spread_price > 0.05:
|
|
premium_received -= current_call_spread_price
|
|
|
|
if not put_spread_stopped_out and current_put_spread_price > 0.05:
|
|
premium_received -= current_put_spread_price
|
|
|
|
number_of_contracts = call_spread_strategy.number_of_contracts
|
|
stop_out_fees = 0.0 # It costs money to get stopped out.
|
|
if call_spread_stopped_out:
|
|
stop_out_fees += (2 * FEES_PER_CONTRACT * number_of_contracts)
|
|
if put_spread_stopped_out:
|
|
stop_out_fees += (2 * FEES_PER_CONTRACT * number_of_contracts)
|
|
fees = 4 * FEES_PER_CONTRACT * number_of_contracts
|
|
commissions = 4 * number_of_contracts
|
|
premium_received = (premium_received * number_of_contracts * 100) - (fees + commissions) - stop_out_fees
|
|
|
|
dates.append(current_date)
|
|
max_drawdowns.append(max_drawdown)
|
|
max_profits.append(max_profit)
|
|
wins.append(True if premium_received > 0 else False)
|
|
exit_times.append(exit_time)
|
|
|
|
result = BacktestResult(
|
|
date=current_date,
|
|
entry_time=f'{current_date} {entry_time}',
|
|
exit_time=f'{current_date} {exit_time}',
|
|
spreads=[call_spread_details, put_spread_details],
|
|
trade_entered=True,
|
|
trade_pnl=premium_received,
|
|
profit=0.0, # TODO: Calculated elsewhere. Clean this up.
|
|
credit=original_call_spread_price + original_put_spread_price,
|
|
mfe=max_profit,
|
|
mae=max_drawdown
|
|
)
|
|
|
|
logging.info('Premium Received: %f', premium_received)
|
|
return result
|
|
|
|
def backtest_iron_condor(
|
|
backtest_name: str,
|
|
call_spread_strategy: OptionSpreadStrategy,
|
|
put_spread_strategy: OptionSpreadStrategy,
|
|
start_date: datetime,
|
|
end_date: datetime
|
|
) -> pd.DataFrame:
|
|
|
|
total_premium_received = 0.0
|
|
total_trades = 0.0
|
|
total_wins = 0.0
|
|
|
|
start_year = start_date.year
|
|
end_year = end_date.year
|
|
|
|
futures = []
|
|
with ProcessPoolExecutor(max_workers = 10) as executor:
|
|
for year in range(start_year, end_year + 1):
|
|
year_directory = os.path.join(OPTION_DATA_DIRECTORY, str(year))
|
|
for file in os.listdir(year_directory):
|
|
historical_data_file = os.path.join(year_directory, file)
|
|
if os.path.isdir(historical_data_file) or not file.endswith('.csv'):
|
|
continue
|
|
|
|
# Assuming file format 'YYYY-MM-DD.csv'.
|
|
file_date_str = os.path.splitext(file)[0]
|
|
file_date = datetime.strptime(file_date_str, '%Y-%m-%d')
|
|
|
|
# TODO: This doesn't work as expected when the start date is not set to midnight.
|
|
if file_date < start_date or file_date > end_date:
|
|
continue
|
|
|
|
logging.info('Processing File: %s', historical_data_file)
|
|
|
|
future = executor.submit(_backtest_iron_condor,
|
|
historical_data_file, call_spread_strategy, put_spread_strategy)
|
|
futures.append(future)
|
|
|
|
backtest_results = []
|
|
for future in futures:
|
|
backtest_result = future.result()
|
|
if backtest_result:
|
|
total_premium_received += backtest_result.trade_pnl
|
|
backtest_result.profit = total_premium_received
|
|
backtest_results.append(backtest_result)
|
|
|
|
if backtest_result.trade_entered:
|
|
total_trades += 1
|
|
if backtest_result.trade_pnl > 0.0:
|
|
total_wins += 1
|
|
|
|
logging.info('Overall PnL: %f', total_premium_received)
|
|
logging.info('Win Rate: %f', (total_wins / total_trades) if total_trades > 0 else 0.0)
|
|
logging.info('Average Premium Received: %f', (total_premium_received / total_trades) if total_trades > 0 else 0.0)
|
|
|
|
# TODO: Either look up the symbol in the historical option data or have the client provide it.
|
|
backtest_results = pd.DataFrame([{
|
|
'Date': result.date,
|
|
'Symbol': 'SPX',
|
|
'Strategy': backtest_name,
|
|
'Entry Time': result.entry_time,
|
|
'Exit Time': result.exit_time,
|
|
'Spreads': result.spreads,
|
|
'Profit': result.trade_pnl,
|
|
'Cumulative Profit': result.profit
|
|
} for result in backtest_results])
|
|
|
|
return backtest_results |