Source code for src.processing.transformer
"""Data transformation and normalization."""
from typing import Dict, Any, List
from datetime import datetime
import pandas as pd
from ..utils.errors import TransformError
from ..utils.logging import get_logger
logger = get_logger(__name__)
[docs]
class DataTransformer:
"""Transform raw API data into structured DataFrames."""
[docs]
def transform_interest_data(
self,
api_response: Dict[str, Any],
query_labels: Dict[str, str]
) -> pd.DataFrame:
"""
Transform Google Trends interest_over_time data into DataFrame.
Args:
api_response: Raw API response
query_labels: Mapping of queries to display labels
Returns:
DataFrame with columns: date, query, value
Raises:
TransformError: If transformation fails
"""
try:
interest_data = api_response.get("interest_over_time", {})
timeline_data = interest_data.get("timeline_data", [])
if not timeline_data:
raise TransformError("No timeline data found in API response")
rows = []
for time_point in timeline_data:
# Parse date
date_str = time_point.get("date", "")
timestamp = time_point.get("timestamp")
if timestamp:
date = datetime.fromtimestamp(int(timestamp))
else:
date = self._parse_date_string(date_str)
# Extract values for each query
values = time_point.get("values", [])
for value_data in values:
query = value_data.get("query", "")
value = self._parse_interest_value(value_data)
# Use label if available
label = query_labels.get(query, query)
rows.append({
"date": date,
"query": label,
"value": value
})
df = pd.DataFrame(rows)
# Ensure proper types
df["date"] = pd.to_datetime(df["date"])
df["value"] = pd.to_numeric(df["value"], errors="coerce")
# Sort by date
df = df.sort_values("date").reset_index(drop=True)
logger.info(f"Transformed {len(df)} data points for {df['query'].nunique()} queries")
return df
except Exception as e:
raise TransformError(f"Failed to transform interest data: {e}")
def _parse_date_string(self, date_str: str) -> datetime:
"""
Parse various date string formats from Google Trends.
Args:
date_str: Date string (e.g., "Nov 1 – 7, 2024" or "Nov 2024")
Returns:
datetime object
"""
try:
# Handle date ranges (take start date)
if "–" in date_str or "-" in date_str:
# Split on dash and comma
parts = date_str.replace("–", "-").split(",")
if len(parts) > 1:
month_day = parts[0].split("-")[0].strip()
year = parts[1].strip()
date_str = f"{month_day} {year}"
# Try common formats
for fmt in ["%b %d %Y", "%b %Y", "%Y-%m-%d"]:
try:
return datetime.strptime(date_str.strip(), fmt)
except ValueError:
continue
# Fallback: use current date
logger.warning(f"Could not parse date: {date_str}, using current date")
return datetime.now()
except Exception:
return datetime.now()
def _parse_interest_value(self, value_data: Dict[str, Any]) -> float:
"""
Parse interest value from API response.
Args:
value_data: Value data dictionary
Returns:
Numeric interest value
"""
# Try extracted_value first
value = value_data.get("extracted_value")
if value is not None:
return float(value)
# Try value field
value = value_data.get("value", 0)
# Handle special values
if isinstance(value, str):
value = value.strip()
if value == "<1":
return 0.5
if value == "":
return 0.0
try:
return float(value)
except (ValueError, TypeError):
return 0.0
[docs]
def add_moving_average(
self,
df: pd.DataFrame,
window: int = 3,
column: str = "value"
) -> pd.DataFrame:
"""
Add moving average column to DataFrame.
Args:
df: Input DataFrame
window: Window size for moving average
column: Column to smooth
Returns:
DataFrame with added value_ma column
"""
df = df.copy()
# Calculate moving average per query
df["value_ma"] = df.groupby("query")[column].transform(
lambda x: x.rolling(window=window, min_periods=1, center=False).mean()
)
logger.debug(f"Added moving average with window={window}")
return df
[docs]
def normalize_to_100(self, df: pd.DataFrame, column: str = "value") -> pd.DataFrame:
"""
Normalize values to 0-100 scale.
Args:
df: Input DataFrame
column: Column to normalize
Returns:
DataFrame with normalized column
"""
df = df.copy()
# Find global max
max_value = df[column].max()
if max_value > 0:
df[f"{column}_normalized"] = (df[column] / max_value) * 100
else:
df[f"{column}_normalized"] = 0
return df