Source code for src.data.providers.serpapi

"""SerpAPI provider implementation for Google Trends data."""

import os
import time
from typing import Dict, Any, List
import requests

from .base import DataProvider
from ...utils.errors import APIError, RateLimitError, InvalidResponseError
from ...utils.logging import get_logger

logger = get_logger(__name__)


[docs] class SerpAPIProvider(DataProvider): """SerpAPI implementation for fetching Google Trends data.""" def __init__( self, api_key: str, max_retries: int = 3, retry_delay: int = 5, timeout: int = 30 ): """ Initialize SerpAPI provider. Args: api_key: SerpAPI key max_retries: Maximum retry attempts retry_delay: Initial delay between retries (seconds) timeout: Request timeout (seconds) """ self.api_key = api_key self.max_retries = max_retries self.retry_delay = retry_delay self.timeout = timeout self.base_url = "https://serpapi.com/search"
[docs] def validate_connection(self) -> bool: """ Validate API connection. Returns: True if connection is valid Raises: APIError: If validation fails """ try: # Simple test request params = { "engine": "google_trends", "q": "test", "api_key": self.api_key } response = requests.get( self.base_url, params=params, timeout=self.timeout ) if response.status_code == 401: raise APIError("Invalid API key") elif response.status_code == 429: raise RateLimitError("Rate limit exceeded") return response.status_code == 200 except requests.exceptions.RequestException as e: raise APIError(f"Connection validation failed: {e}")
def _make_request_with_retry(self, params: Dict[str, Any]) -> Dict[str, Any]: """ Make API request with exponential backoff retry. Args: params: Request parameters Returns: Parsed JSON response Raises: APIError: If all retries fail RateLimitError: If rate limit exceeded """ last_exception = None for attempt in range(self.max_retries + 1): try: response = requests.get( self.base_url, params=params, timeout=self.timeout ) # Handle rate limiting if response.status_code == 429: if attempt < self.max_retries: sleep_time = self.retry_delay * (2 ** attempt) logger.warning(f"Rate limited. Retrying in {sleep_time}s...") time.sleep(sleep_time) continue raise RateLimitError("Rate limit exceeded after all retries") # Handle other errors if response.status_code != 200: error_msg = f"API returned status {response.status_code}" try: error_data = response.json() if "error" in error_data: error_msg = error_data["error"] except: pass if attempt < self.max_retries: sleep_time = self.retry_delay * (2 ** attempt) logger.warning(f"Request failed: {error_msg}. Retrying in {sleep_time}s...") time.sleep(sleep_time) continue raise APIError(error_msg) # Parse response try: data = response.json() return data except ValueError as e: raise InvalidResponseError(f"Failed to parse JSON response: {e}") except requests.exceptions.Timeout as e: last_exception = e if attempt < self.max_retries: sleep_time = self.retry_delay * (2 ** attempt) logger.warning(f"Request timeout. Retrying in {sleep_time}s...") time.sleep(sleep_time) continue except requests.exceptions.RequestException as e: last_exception = e if attempt < self.max_retries: sleep_time = self.retry_delay * (2 ** attempt) logger.warning(f"Request failed: {e}. Retrying in {sleep_time}s...") time.sleep(sleep_time) continue # All retries exhausted raise APIError(f"Request failed after {self.max_retries + 1} attempts: {last_exception}")
[docs] def create_provider(api_key: str, **options) -> SerpAPIProvider: """ Factory function to create SerpAPI provider. Args: api_key: SerpAPI key **options: Provider options (max_retries, retry_delay, timeout) Returns: Configured SerpAPIProvider instance """ return SerpAPIProvider( api_key=api_key, max_retries=options.get("max_retries", 3), retry_delay=options.get("retry_delay", 5), timeout=options.get("timeout", 30) )