news-automation/news/scrape_news.py

74 lines
2.5 KiB
Python

import cloudscraper
import pandas as pd
from bs4 import BeautifulSoup
from datetime import datetime, timedelta
from time import sleep
from typing import Generator
def convert_datetime_to_url_format(date: datetime) -> str:
return date.strftime("%b%d.%Y").lower()
def scrape_news(date: datetime) -> pd.DataFrame:
base_url = "https://www.forexfactory.com/calendar"
formatted_date = convert_datetime_to_url_format(date)
url = f"{base_url}?day={formatted_date}"
scraper = cloudscraper.create_scraper()
response = scraper.get(url)
if response.status_code != 200:
print(f"Failed to retrieve news. Status code: {response.status_code}")
return
soup = BeautifulSoup(response.text, 'html.parser')
rows = soup.find_all("tr", class_="calendar__row")
previous_time = None
times = []
currencies = []
titles = []
for row in rows:
try:
time = row.find("td", class_="calendar__cell calendar__time").text.strip()
currency = row.find("td", class_="calendar__cell calendar__currency").text.strip()
title = row.find("span", class_="calendar__event-title").text.strip()
# If the time is not present, use the previous time.
if not time:
time = previous_time
else:
previous_time = time
print(f"Time: {time}")
print(f"Currency: {currency}")
print(f"Title: {title}")
print("---------------------------")
times.append(time)
currencies.append(currency)
titles.append(title)
except AttributeError:
print("Failed to scrape the row")
continue
news_data = pd.DataFrame({"Time": times, "Currency": currencies, "Title": titles})
news_data['Date'] = date.strftime('%Y-%m-%d')
news_data = news_data[['Date', 'Time', 'Currency', 'Title']]
return news_data
def scrape_news_in_range(start_date: datetime, end_date: datetime) -> Generator[pd.DataFrame, None, None]:
current_date = start_date
while current_date <= end_date:
print(f"Scraping data for {current_date}")
news_data = scrape_news(current_date)
yield news_data
current_date += timedelta(days=1)
sleep(3) # Play nice with the server.
if __name__ == '__main__':
start_date = datetime(2023, 10, 25)
end_date = datetime(2023, 10, 25)
scrape_news_in_range(start_date, end_date)
for news_data in scrape_news_in_range(start_date, end_date):
print(news_data)