"""SerpAPI provider implementation for Google Trends data."""
import os
import time
from typing import Dict, Any, List
import requests
from .base import DataProvider
from ...utils.errors import APIError, RateLimitError, InvalidResponseError
from ...utils.logging import get_logger
logger = get_logger(__name__)
[docs]
class SerpAPIProvider(DataProvider):
"""SerpAPI implementation for fetching Google Trends data."""
def __init__(
self,
api_key: str,
max_retries: int = 3,
retry_delay: int = 5,
timeout: int = 30
):
"""
Initialize SerpAPI provider.
Args:
api_key: SerpAPI key
max_retries: Maximum retry attempts
retry_delay: Initial delay between retries (seconds)
timeout: Request timeout (seconds)
"""
self.api_key = api_key
self.max_retries = max_retries
self.retry_delay = retry_delay
self.timeout = timeout
self.base_url = "https://serpapi.com/search"
[docs]
def fetch_trends(
self,
queries: List[str],
geo: str,
date_range: str,
**kwargs
) -> Dict[str, Any]:
"""
Fetch Google Trends data via SerpAPI.
Args:
queries: List of search queries (1-5)
geo: Geographic region code (e.g., 'GB', 'US')
date_range: Date range (e.g., 'today 12-m', 'today 5-y')
**kwargs: Additional parameters
Returns:
Parsed API response containing interest_over_time data
Raises:
APIError: If request fails
RateLimitError: If rate limit exceeded
InvalidResponseError: If response is invalid
"""
# Validate inputs
if not queries or len(queries) > 5:
raise ValueError("Must provide 1-5 queries")
# Build request parameters
params = {
"engine": "google_trends",
"q": ",".join(queries),
"geo": geo,
"date": date_range,
"data_type": "TIMESERIES",
"api_key": self.api_key
}
# Add optional parameters
if "tz" in kwargs:
params["tz"] = kwargs["tz"]
logger.debug(f"Fetching trends for {len(queries)} queries: {queries}")
# Make request with retry logic
response = self._make_request_with_retry(params)
# Validate response
if "interest_over_time" not in response:
if "error" in response:
raise InvalidResponseError(f"API error: {response['error']}")
raise InvalidResponseError("No interest_over_time data in response")
logger.info(f"Successfully fetched trends data for {len(queries)} queries")
return response
[docs]
def validate_connection(self) -> bool:
"""
Validate API connection.
Returns:
True if connection is valid
Raises:
APIError: If validation fails
"""
try:
# Simple test request
params = {
"engine": "google_trends",
"q": "test",
"api_key": self.api_key
}
response = requests.get(
self.base_url,
params=params,
timeout=self.timeout
)
if response.status_code == 401:
raise APIError("Invalid API key")
elif response.status_code == 429:
raise RateLimitError("Rate limit exceeded")
return response.status_code == 200
except requests.exceptions.RequestException as e:
raise APIError(f"Connection validation failed: {e}")
def _make_request_with_retry(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
Make API request with exponential backoff retry.
Args:
params: Request parameters
Returns:
Parsed JSON response
Raises:
APIError: If all retries fail
RateLimitError: If rate limit exceeded
"""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
response = requests.get(
self.base_url,
params=params,
timeout=self.timeout
)
# Handle rate limiting
if response.status_code == 429:
if attempt < self.max_retries:
sleep_time = self.retry_delay * (2 ** attempt)
logger.warning(f"Rate limited. Retrying in {sleep_time}s...")
time.sleep(sleep_time)
continue
raise RateLimitError("Rate limit exceeded after all retries")
# Handle other errors
if response.status_code != 200:
error_msg = f"API returned status {response.status_code}"
try:
error_data = response.json()
if "error" in error_data:
error_msg = error_data["error"]
except:
pass
if attempt < self.max_retries:
sleep_time = self.retry_delay * (2 ** attempt)
logger.warning(f"Request failed: {error_msg}. Retrying in {sleep_time}s...")
time.sleep(sleep_time)
continue
raise APIError(error_msg)
# Parse response
try:
data = response.json()
return data
except ValueError as e:
raise InvalidResponseError(f"Failed to parse JSON response: {e}")
except requests.exceptions.Timeout as e:
last_exception = e
if attempt < self.max_retries:
sleep_time = self.retry_delay * (2 ** attempt)
logger.warning(f"Request timeout. Retrying in {sleep_time}s...")
time.sleep(sleep_time)
continue
except requests.exceptions.RequestException as e:
last_exception = e
if attempt < self.max_retries:
sleep_time = self.retry_delay * (2 ** attempt)
logger.warning(f"Request failed: {e}. Retrying in {sleep_time}s...")
time.sleep(sleep_time)
continue
# All retries exhausted
raise APIError(f"Request failed after {self.max_retries + 1} attempts: {last_exception}")
[docs]
def create_provider(api_key: str, **options) -> SerpAPIProvider:
"""
Factory function to create SerpAPI provider.
Args:
api_key: SerpAPI key
**options: Provider options (max_retries, retry_delay, timeout)
Returns:
Configured SerpAPIProvider instance
"""
return SerpAPIProvider(
api_key=api_key,
max_retries=options.get("max_retries", 3),
retry_delay=options.get("retry_delay", 5),
timeout=options.get("timeout", 30)
)