"""Share of Search metrics calculation."""
from typing import Dict, List
import pandas as pd
import numpy as np
from ..utils.errors import CalculationError
from ..utils.logging import get_logger
logger = get_logger(__name__)
[docs]
class MetricsCalculator:
"""Calculate Share of Search metrics and derivatives."""
[docs]
def calculate_share_of_search(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Calculate Share of Search percentages.
Args:
df: DataFrame with columns: date, query, value
Returns:
DataFrame with added share_of_search column
Raises:
CalculationError: If calculation fails
"""
try:
df = df.copy()
# Calculate total per date
df['total'] = df.groupby('date')['value'].transform('sum')
# Calculate share of search (percentage)
df['share_of_search'] = (df['value'] / df['total'] * 100).fillna(0)
# Drop temporary total column
df = df.drop(columns=['total'])
logger.info("Calculated Share of Search percentages")
return df
except Exception as e:
raise CalculationError(f"Failed to calculate Share of Search: {e}")
[docs]
def calculate_period_metrics(
self,
df: pd.DataFrame,
group_by: str = "query"
) -> pd.DataFrame:
"""
Calculate aggregate metrics per query.
Args:
df: DataFrame with share_of_search column
group_by: Column to group by (default: query)
Returns:
DataFrame with aggregate metrics per query
"""
try:
metrics = df.groupby(group_by).agg({
'share_of_search': ['mean', 'std', 'min', 'max'],
'value': ['sum', 'mean']
}).round(2)
# Flatten column names
metrics.columns = ['_'.join(col).strip() for col in metrics.columns.values]
metrics = metrics.reset_index()
# Rename for clarity
metrics = metrics.rename(columns={
'share_of_search_mean': 'avg_share',
'share_of_search_std': 'volatility',
'share_of_search_min': 'min_share',
'share_of_search_max': 'max_share',
'value_sum': 'total_interest',
'value_mean': 'avg_interest'
})
logger.info(f"Calculated period metrics for {len(metrics)} {group_by}s")
return metrics
except Exception as e:
raise CalculationError(f"Failed to calculate period metrics: {e}")
[docs]
def calculate_trend_direction(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Calculate trend direction (rising/stable/falling).
Args:
df: DataFrame with date and share_of_search columns
Returns:
DataFrame with added trend_direction column
"""
try:
df = df.copy()
# Calculate linear regression slope per query
def calc_slope(series):
if len(series) < 2:
return 0
x = np.arange(len(series))
y = series.values
# Filter out NaN
mask = ~np.isnan(y)
if mask.sum() < 2:
return 0
x = x[mask]
y = y[mask]
# Calculate slope
slope = np.polyfit(x, y, 1)[0]
return slope
slopes = df.groupby('query')['share_of_search'].apply(calc_slope)
# Classify trends
trend_map = {}
for query, slope in slopes.items():
if slope > 0.1:
trend_map[query] = "rising"
elif slope < -0.1:
trend_map[query] = "falling"
else:
trend_map[query] = "stable"
df['trend_direction'] = df['query'].map(trend_map)
df['trend_slope'] = df['query'].map(slopes)
logger.info("Calculated trend directions")
return df
except Exception as e:
raise CalculationError(f"Failed to calculate trend direction: {e}")
[docs]
def calculate_momentum(self, df: pd.DataFrame, periods: int = 3) -> pd.DataFrame:
"""
Calculate momentum (rate of change).
Args:
df: DataFrame with share_of_search column
periods: Number of periods to look back
Returns:
DataFrame with added momentum column
"""
try:
df = df.copy()
# Calculate period-over-period change
df['momentum'] = df.groupby('query')['share_of_search'].pct_change(periods=periods) * 100
df['momentum'] = df['momentum'].fillna(0).round(2)
logger.debug(f"Calculated momentum with {periods}-period lookback")
return df
except Exception as e:
raise CalculationError(f"Failed to calculate momentum: {e}")
[docs]
def calculate_volatility(self, df: pd.DataFrame, window: int = 4) -> pd.DataFrame:
"""
Calculate rolling volatility for all queries.
Args:
df: DataFrame with share_of_search column
window: Rolling window size
Returns:
DataFrame with added volatility column
"""
try:
df = df.copy()
# Calculate rolling standard deviation per query
df['volatility'] = df.groupby('query')['share_of_search'].transform(
lambda x: x.rolling(window=window, min_periods=1).std()
)
df['volatility'] = df['volatility'].fillna(0).round(2)
logger.debug(f"Calculated volatility with {window}-period window")
return df
except Exception as e:
raise CalculationError(f"Failed to calculate volatility: {e}")
[docs]
def calculate_volatility_value(self, df: pd.DataFrame, query: str, window: int = 4) -> float:
"""
Calculate volatility value for a specific query.
Args:
df: DataFrame with share_of_search column
query: Query to calculate volatility for
window: Rolling window size
Returns:
Volatility value (standard deviation) for the query
"""
try:
# Filter to specific query and sort by date
query_data = df[df['query'] == query].sort_values('date')
if len(query_data) == 0:
return 0.0
# Calculate rolling standard deviation
volatility_series = (
query_data['share_of_search']
.rolling(window=window, min_periods=1)
.std()
)
# Return the most recent volatility value
latest_volatility = volatility_series.iloc[-1] if not volatility_series.empty else 0.0
return float(max(latest_volatility, 0.0))
except Exception as e:
raise CalculationError(f"Failed to calculate volatility for {query}: {e}")
[docs]
def detect_anomalies(
self,
df: pd.DataFrame,
column: str = "share_of_search",
threshold: float = 2.5
) -> pd.DataFrame:
"""
Detect anomalies using z-score method.
Args:
df: DataFrame to analyze
column: Column to check for anomalies
threshold: Z-score threshold for anomaly detection
Returns:
DataFrame with added is_anomaly and z_score columns
"""
try:
df = df.copy()
# Calculate z-scores per query
df['z_score'] = df.groupby('query')[column].transform(
lambda x: (x - x.mean()) / x.std() if x.std() > 0 else 0
)
# Mark anomalies
df['is_anomaly'] = (df['z_score'].abs() > threshold)
anomaly_count = df['is_anomaly'].sum()
logger.info(f"Detected {anomaly_count} anomalies (threshold: {threshold})")
return df
except Exception as e:
raise CalculationError(f"Failed to detect anomalies: {e}")
[docs]
def calculate_market_concentration(self, df: pd.DataFrame) -> Dict[str, float]:
"""
Calculate Herfindahl-Hirschman Index (HHI) for market concentration.
Args:
df: DataFrame with share_of_search column
Returns:
Dictionary with HHI metrics
"""
try:
# Get latest period's shares
latest_date = df['date'].max()
latest = df[df['date'] == latest_date]
# Calculate HHI (sum of squared market shares)
shares = latest['share_of_search'].values
hhi = (shares ** 2).sum()
# Interpret HHI
if hhi < 1500:
concentration = "low"
elif hhi < 2500:
concentration = "moderate"
else:
concentration = "high"
result = {
"hhi": round(hhi, 2),
"concentration": concentration,
"num_competitors": len(shares),
"date": latest_date
}
logger.info(f"Market concentration: HHI={hhi:.2f} ({concentration})")
return result
except Exception as e:
raise CalculationError(f"Failed to calculate market concentration: {e}")
[docs]
def calculate_all_metrics(
self,
df: pd.DataFrame,
smoothing_window: int = 3
) -> pd.DataFrame:
"""
Calculate all available metrics.
Args:
df: Input DataFrame with date, query, value columns
smoothing_window: Window for smoothing calculations
Returns:
DataFrame with all calculated metrics
"""
logger.info("Calculating all metrics...")
# Core calculations
df = self.calculate_share_of_search(df)
df = self.calculate_trend_direction(df)
df = self.calculate_momentum(df, periods=smoothing_window)
df = self.calculate_volatility(df, window=smoothing_window)
df = self.detect_anomalies(df)
logger.info("All metrics calculated successfully")
return df