Source code for src.processing.calculator

"""Share of Search metrics calculation."""

from typing import Dict, List
import pandas as pd
import numpy as np

from ..utils.errors import CalculationError
from ..utils.logging import get_logger

logger = get_logger(__name__)


[docs] class MetricsCalculator: """Calculate Share of Search metrics and derivatives."""
[docs] def calculate_period_metrics( self, df: pd.DataFrame, group_by: str = "query" ) -> pd.DataFrame: """ Calculate aggregate metrics per query. Args: df: DataFrame with share_of_search column group_by: Column to group by (default: query) Returns: DataFrame with aggregate metrics per query """ try: metrics = df.groupby(group_by).agg({ 'share_of_search': ['mean', 'std', 'min', 'max'], 'value': ['sum', 'mean'] }).round(2) # Flatten column names metrics.columns = ['_'.join(col).strip() for col in metrics.columns.values] metrics = metrics.reset_index() # Rename for clarity metrics = metrics.rename(columns={ 'share_of_search_mean': 'avg_share', 'share_of_search_std': 'volatility', 'share_of_search_min': 'min_share', 'share_of_search_max': 'max_share', 'value_sum': 'total_interest', 'value_mean': 'avg_interest' }) logger.info(f"Calculated period metrics for {len(metrics)} {group_by}s") return metrics except Exception as e: raise CalculationError(f"Failed to calculate period metrics: {e}")
[docs] def calculate_trend_direction(self, df: pd.DataFrame) -> pd.DataFrame: """ Calculate trend direction (rising/stable/falling). Args: df: DataFrame with date and share_of_search columns Returns: DataFrame with added trend_direction column """ try: df = df.copy() # Calculate linear regression slope per query def calc_slope(series): if len(series) < 2: return 0 x = np.arange(len(series)) y = series.values # Filter out NaN mask = ~np.isnan(y) if mask.sum() < 2: return 0 x = x[mask] y = y[mask] # Calculate slope slope = np.polyfit(x, y, 1)[0] return slope slopes = df.groupby('query')['share_of_search'].apply(calc_slope) # Classify trends trend_map = {} for query, slope in slopes.items(): if slope > 0.1: trend_map[query] = "rising" elif slope < -0.1: trend_map[query] = "falling" else: trend_map[query] = "stable" df['trend_direction'] = df['query'].map(trend_map) df['trend_slope'] = df['query'].map(slopes) logger.info("Calculated trend directions") return df except Exception as e: raise CalculationError(f"Failed to calculate trend direction: {e}")
[docs] def calculate_momentum(self, df: pd.DataFrame, periods: int = 3) -> pd.DataFrame: """ Calculate momentum (rate of change). Args: df: DataFrame with share_of_search column periods: Number of periods to look back Returns: DataFrame with added momentum column """ try: df = df.copy() # Calculate period-over-period change df['momentum'] = df.groupby('query')['share_of_search'].pct_change(periods=periods) * 100 df['momentum'] = df['momentum'].fillna(0).round(2) logger.debug(f"Calculated momentum with {periods}-period lookback") return df except Exception as e: raise CalculationError(f"Failed to calculate momentum: {e}")
[docs] def calculate_volatility(self, df: pd.DataFrame, window: int = 4) -> pd.DataFrame: """ Calculate rolling volatility for all queries. Args: df: DataFrame with share_of_search column window: Rolling window size Returns: DataFrame with added volatility column """ try: df = df.copy() # Calculate rolling standard deviation per query df['volatility'] = df.groupby('query')['share_of_search'].transform( lambda x: x.rolling(window=window, min_periods=1).std() ) df['volatility'] = df['volatility'].fillna(0).round(2) logger.debug(f"Calculated volatility with {window}-period window") return df except Exception as e: raise CalculationError(f"Failed to calculate volatility: {e}")
[docs] def calculate_volatility_value(self, df: pd.DataFrame, query: str, window: int = 4) -> float: """ Calculate volatility value for a specific query. Args: df: DataFrame with share_of_search column query: Query to calculate volatility for window: Rolling window size Returns: Volatility value (standard deviation) for the query """ try: # Filter to specific query and sort by date query_data = df[df['query'] == query].sort_values('date') if len(query_data) == 0: return 0.0 # Calculate rolling standard deviation volatility_series = ( query_data['share_of_search'] .rolling(window=window, min_periods=1) .std() ) # Return the most recent volatility value latest_volatility = volatility_series.iloc[-1] if not volatility_series.empty else 0.0 return float(max(latest_volatility, 0.0)) except Exception as e: raise CalculationError(f"Failed to calculate volatility for {query}: {e}")
[docs] def detect_anomalies( self, df: pd.DataFrame, column: str = "share_of_search", threshold: float = 2.5 ) -> pd.DataFrame: """ Detect anomalies using z-score method. Args: df: DataFrame to analyze column: Column to check for anomalies threshold: Z-score threshold for anomaly detection Returns: DataFrame with added is_anomaly and z_score columns """ try: df = df.copy() # Calculate z-scores per query df['z_score'] = df.groupby('query')[column].transform( lambda x: (x - x.mean()) / x.std() if x.std() > 0 else 0 ) # Mark anomalies df['is_anomaly'] = (df['z_score'].abs() > threshold) anomaly_count = df['is_anomaly'].sum() logger.info(f"Detected {anomaly_count} anomalies (threshold: {threshold})") return df except Exception as e: raise CalculationError(f"Failed to detect anomalies: {e}")
[docs] def calculate_market_concentration(self, df: pd.DataFrame) -> Dict[str, float]: """ Calculate Herfindahl-Hirschman Index (HHI) for market concentration. Args: df: DataFrame with share_of_search column Returns: Dictionary with HHI metrics """ try: # Get latest period's shares latest_date = df['date'].max() latest = df[df['date'] == latest_date] # Calculate HHI (sum of squared market shares) shares = latest['share_of_search'].values hhi = (shares ** 2).sum() # Interpret HHI if hhi < 1500: concentration = "low" elif hhi < 2500: concentration = "moderate" else: concentration = "high" result = { "hhi": round(hhi, 2), "concentration": concentration, "num_competitors": len(shares), "date": latest_date } logger.info(f"Market concentration: HHI={hhi:.2f} ({concentration})") return result except Exception as e: raise CalculationError(f"Failed to calculate market concentration: {e}")
[docs] def calculate_all_metrics( self, df: pd.DataFrame, smoothing_window: int = 3 ) -> pd.DataFrame: """ Calculate all available metrics. Args: df: Input DataFrame with date, query, value columns smoothing_window: Window for smoothing calculations Returns: DataFrame with all calculated metrics """ logger.info("Calculating all metrics...") # Core calculations df = self.calculate_share_of_search(df) df = self.calculate_trend_direction(df) df = self.calculate_momentum(df, periods=smoothing_window) df = self.calculate_volatility(df, window=smoothing_window) df = self.detect_anomalies(df) logger.info("All metrics calculated successfully") return df