options-automation/iron_condor_scheduler.py

70 lines
2.2 KiB
Python

import schedule
import time
from database.backtest import backtest_results
from datetime import datetime, timedelta
from dotenv import load_dotenv
from ibkr import Client
from iron_condor import enter_iron_condor
from multiprocessing import Process
from os import getenv
load_dotenv()
entry_time_format = '%H:%M:%S'
def available_entry_times():
start = datetime.strptime('09:35:00', entry_time_format)
end = datetime.strptime('15:55:00', entry_time_format)
current_time = start
entry_times = []
while current_time <= end:
entry_times.append(current_time.strftime(entry_time_format))
current_time += timedelta(minutes = 5)
return entry_times
def best_entry_times():
end_date = datetime.now().replace(hour = 0, minute = 0, second = 0, microsecond = 0)
start_date = end_date - timedelta(days = 10) # TODO: Really want 5 most recent trading days.
cumulative_profits = {}
for entry_time in available_entry_times():
backtest = backtest_results('SPX', f'10 Delta Iron Condor @ {entry_time}',
start_date = start_date, end_date = end_date)
cumulative_profits[entry_time] = backtest.tail(5)['Profit'].sum()
print(backtest)
sorted_entry_times = sorted(cumulative_profits.items(), key=lambda x: x[1], reverse=True)[:10]
return [entry_time for entry_time, _ in sorted_entry_times]
def enter_trade():
job_process = Process(target = enter_iron_condor)
job_process.start()
def connection_successful():
try:
ibkr_host = getenv('IBKR_HOST')
ibkr_port = getenv('IBKR_PORT')
Client(host = ibkr_host, port = ibkr_port)
return True
except:
return False
if __name__ == '__main__':
if connection_successful():
print('Connected to IBKR.')
entry_times = best_entry_times()
print(entry_times)
for entry_time in entry_times:
print(f'Scheduling for {entry_time}.')
schedule.every().day.at(entry_time, 'America/New_York').do(enter_trade)
while True:
schedule.run_pending()
time.sleep(1)
else:
print('ERROR: Cannot connect to IBKR. Ensure that TWS / Gateway is running.')