options-automation/iron_condor_scheduler.py
2024-01-29 08:44:37 -08:00

83 lines
2.6 KiB
Python

import schedule
import time
from database.backtest import backtest_results
from datetime import datetime, timedelta
from dotenv import load_dotenv
from ibkr import Client
from iron_condor import enter_iron_condor
from multiprocessing import Process
from os import getenv
from pytz import timezone
load_dotenv()
entry_time_format = '%H:%M:%S'
eastern_timezone = 'America/New_York'
def available_entry_times():
start = datetime.strptime('09:35:00', entry_time_format)
end = datetime.strptime('15:55:00', entry_time_format)
current_time = start
entry_times = []
while current_time <= end:
entry_times.append(current_time.strftime(entry_time_format))
current_time += timedelta(minutes = 5)
return entry_times
def best_entry_times():
end_date = datetime.now().replace(hour = 0, minute = 0, second = 0, microsecond = 0)
start_date = end_date - timedelta(days = 10) # TODO: Really want 5 most recent trading days.
cumulative_profits = {}
for entry_time in available_entry_times():
backtest = backtest_results('SPX', f'10 Delta Iron Condor @ {entry_time}',
start_date = start_date, end_date = end_date)
cumulative_profits[entry_time] = backtest.tail(5)['Profit'].sum()
print(backtest)
sorted_entry_times = sorted(cumulative_profits.items(), key=lambda x: x[1], reverse=True)[:10]
return [entry_time for entry_time, _ in sorted_entry_times]
def enter_trade():
job_process = Process(target = enter_iron_condor)
job_process.start()
def connection_successful():
try:
ibkr_host = getenv('IBKR_HOST')
ibkr_port = getenv('IBKR_PORT')
Client(host = ibkr_host, port = ibkr_port)
return True
except:
return False
if __name__ == '__main__':
if connection_successful():
print('Connected to IBKR.')
now = datetime.now(timezone(eastern_timezone))
entry_times = sorted(best_entry_times())
print(entry_times)
for entry_time in entry_times:
schedule_time = datetime.strptime(entry_time, '%H:%M:%S').replace(
year = now.year,
month = now.month,
day = now.day,
tzinfo = now.tzinfo
)
# Prevent scheduling for times that have already elapsed.
if schedule_time > now:
print(f'Scheduling for {entry_time}.')
schedule.every().day.at(entry_time, eastern_timezone).do(enter_trade)
while True:
schedule.run_pending()
time.sleep(1)
else:
print('ERROR: Cannot connect to IBKR. Ensure that TWS / Gateway is running.')