Spaces:
Sleeping
Sleeping
| import yfinance as yf | |
| import pandas as pd | |
| from newsapi import NewsApiClient | |
| import os | |
| from datetime import datetime, timedelta | |
| def load_api_keys(): | |
| """Loads API keys directly from environment variables.""" | |
| news_api_key = os.getenv("NEWS_API_KEY") | |
| alpha_vantage_key = os.getenv("ALPHA_VANTAGE_KEY") | |
| if not news_api_key: | |
| print("Warning: NEWS_API_KEY environment variable not found.") | |
| return news_api_key, alpha_vantage_key | |
| def get_stock_data(ticker, start_date, end_date): | |
| """ | |
| Fetches historical stock data for a given ticker symbol. | |
| Args: | |
| ticker (str): The stock ticker symbol (e.g., 'AAPL'). | |
| start_date (str): Start date in 'YYYY-MM-DD' format. | |
| end_date (str): End date in 'YYYY-MM-DD' format. | |
| Returns: | |
| pandas.DataFrame: DataFrame containing historical stock data, or None if an error occurs. | |
| """ | |
| try: | |
| stock = yf.Ticker(ticker) | |
| hist = stock.history(start=start_date, end=end_date) | |
| if hist.empty: | |
| print(f"No data found for {ticker} between {start_date} and {end_date}.") | |
| return None | |
| hist.reset_index(inplace=True) # Make Date a column | |
| hist['Date'] = pd.to_datetime(hist['Date']).dt.date # Keep only the date part | |
| return hist | |
| except Exception as e: | |
| print(f"Error fetching stock data for {ticker}: {e}") | |
| return None | |
| def get_news_articles(query, from_date, to_date, language='en', sort_by='relevancy', page_size=100): | |
| """ | |
| Fetches news articles related to a query within a date range using NewsAPI. | |
| Args: | |
| query (str): The search query (e.g., 'Apple stock'). | |
| from_date (str): Start date in 'YYYY-MM-DD' format. | |
| to_date (str): End date in 'YYYY-MM-DD' format. | |
| language (str): Language of the articles (default: 'en'). | |
| sort_by (str): Sorting criteria (default: 'relevancy'). Options: 'relevancy', 'popularity', 'publishedAt'. | |
| page_size (int): Number of results per page (max 100 for developer plan). | |
| Returns: | |
| list: A list of dictionaries, where each dictionary represents an article, or None if an error occurs. | |
| Returns an empty list if no articles are found. | |
| """ | |
| print(f"Attempting to fetch news with query: '{query}'") # Added print | |
| print(f"Date range: {from_date} to {to_date}") # Added print | |
| news_api_key, _ = load_api_keys() | |
| if not news_api_key: | |
| print("Error: NewsAPI key not available in environment variables. Cannot fetch news.") | |
| return None | |
| try: | |
| newsapi = NewsApiClient(api_key=news_api_key) | |
| # NewsAPI free tier only allows searching articles up to one month old | |
| # Ensure from_date is not too far in the past if using free tier | |
| one_month_ago = (datetime.now() - timedelta(days=29)).strftime('%Y-%m-%d') # Use 29 days to be safe | |
| print(f"One month ago date limit (approx): {one_month_ago}") # Added print | |
| if from_date < one_month_ago: | |
| print(f"Warning: NewsAPI free tier limits searches to the past month. Adjusting from_date from {from_date} to {one_month_ago}") | |
| from_date = one_month_ago | |
| print(f"Calling NewsAPI with: q='{query}', from='{from_date}', to='{to_date}', page_size={page_size}") # Added print | |
| all_articles = newsapi.get_everything(q=query, | |
| from_param=from_date, | |
| to=to_date, | |
| language=language, | |
| sort_by=sort_by, | |
| page_size=page_size) # Max 100 for free tier | |
| print(f"NewsAPI response status: {all_articles.get('status')}") # Added print | |
| if all_articles['status'] == 'ok': | |
| total_results = all_articles['totalResults'] | |
| print(f"Found {total_results} articles for '{query}'") | |
| if total_results == 0: | |
| print("Warning: NewsAPI returned 0 articles for this query and date range.") # Added warning | |
| return all_articles['articles'] | |
| else: | |
| error_code = all_articles.get('code') | |
| error_message = all_articles.get('message') | |
| print(f"Error fetching news from NewsAPI. Code: {error_code}, Message: {error_message}") # More detailed error | |
| return None | |
| except Exception as e: | |
| print(f"Exception occurred while connecting to NewsAPI: {e}") # Clarified exception source | |
| return None | |
| # Placeholder for Alpha Vantage data fetching | |
| def get_alpha_vantage_data(symbol): | |
| """Placeholder function to fetch data using Alpha Vantage.""" | |
| _, alpha_vantage_key = load_api_keys() | |
| if not alpha_vantage_key: | |
| print("Alpha Vantage API key not found in .env file.") | |
| return None | |
| print(f"Fetching data for {symbol} using Alpha Vantage (implementation pending)...") | |
| # Add Alpha Vantage API call logic here | |
| return None | |
| if __name__ == '__main__': | |
| # Example usage (for testing the module directly) | |
| ticker = 'AAPL' | |
| end_date = datetime.now().strftime('%Y-%m-%d') | |
| start_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d') # Look back 30 days | |
| print(f"--- Testing Stock Data Fetching ({ticker}) ---") | |
| stock_data = get_stock_data(ticker, start_date, end_date) | |
| if stock_data is not None: | |
| print(f"Successfully fetched {len(stock_data)} rows of stock data.") | |
| print(stock_data.head()) | |
| else: | |
| print("Failed to fetch stock data.") | |
| print(f"\n--- Testing News Article Fetching ({ticker}) ---") | |
| news_query = f"{ticker} stock" | |
| articles = get_news_articles(news_query, start_date, end_date) | |
| if articles is not None: | |
| print(f"Successfully fetched {len(articles)} articles.") | |
| if articles: | |
| print("First article title:", articles[0]['title']) | |
| else: | |
| print("Failed to fetch news articles.") | |
| # print("\n--- Testing Alpha Vantage (Placeholder) ---") | |
| # get_alpha_vantage_data(ticker) |