windows-automation/file-sync/sync_files.py

71 lines
3.1 KiB
Python
Raw Normal View History

import os
import schedule
import shutil
import subprocess
import time
from datetime import datetime, timedelta
from dotenv import load_dotenv
from pathlib import Path
# Example .env file:
# SOURCE_DIRECTORIES=DATA-DRIVE:\A;DDATA-DRIVE:\B;DATA-DRIVE:\C
# DESTINATION_PATHS=\\NETWORK-SHARE\A;\\NETWORK-SHARE\B;\\NETWORK-SHARE\C
# USERNAME=CHANGE-ME
# PASSWORD=CHANGE-ME
load_dotenv()
def connect_to_network_drive(network_path, username, password):
# Only one connection can be made to a network drive at a time, so attempt a disconnect first.
# Use only the network drive name in the disconnect command, or else it won't work.
network_drive = network_path.split('\\')[1]
disconnect_cmd = f'net use \\\\{network_drive} /delete'
subprocess.run(disconnect_cmd, shell=True)
connect_cmd = f'net use {network_path} /user:{username} {password}'
result = subprocess.run(connect_cmd, shell=True, capture_output=True)
if result.returncode != 0:
raise ConnectionError(f'Failed to connect to network drive: {result.stderr.decode()}')
def file_has_changed(source_file, destination_file):
if not os.path.exists(destination_file):
return True
return os.path.getmtime(source_file) != os.path.getmtime(destination_file)
def copy_files(source_directory, destination_directory):
for root, directories, files in os.walk(source_directory, followlinks=True):
for file in files:
source_file = Path(root) / file
destination_file = Path(destination_directory) / Path(root).relative_to(source_directory) / file
print(f'Checking: {source_file}')
if file_has_changed(source_file, destination_file):
destination_file.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source_file, destination_file)
print(f'Copied: {source_file} to {destination_file}')
def sync_directories(source_directory, destination_path):
username = os.getenv('USERNAME')
password = os.getenv('PASSWORD')
connect_to_network_drive(destination_path, username, password)
copy_files(source_directory, destination_path)
def schedule_syncs(start_hour, start_minute, interval_minutes):
source_directories = os.getenv('SOURCE_DIRECTORIES').split(';')
destination_paths = os.getenv('DESTINATION_PATHS').split(';')
start_time = datetime.now().replace(hour=start_hour, minute=start_minute, second=0, microsecond=0)
for i, (source_directory, destination_path) in enumerate(zip(source_directories, destination_paths)):
schedule_time = start_time + timedelta(minutes=i * interval_minutes)
schedule_time_string = schedule_time.strftime('%H:%M')
schedule.every().day.at(schedule_time_string, 'America/Los_Angeles').do(sync_directories, source_directory, destination_path)
print(f'Scheduled sync for {source_directory} to {destination_path} at {schedule_time_string}.')
if __name__ == '__main__':
# Schedules sync jobs starting at 3:00 AM PT, 30 minutes apart from one another.
schedule_syncs(3, 0, 30)
while True:
schedule.run_pending()
time.sleep(60) # Seconds.