Files
vyndr/nba-service/app/services/stats.py
T
builtbykev 3da1b4242c feat: Feature 1.2 (NBA stats FastAPI service) + Feature 1.4 (database schema)
Feature 1.2: Python FastAPI microservice wrapping nba_api
- GET /stats/season-avg, /stats/last-n, /stats/splits, /players/search
- Redis caching (24hr/1hr/6hr/7day), 0.6s rate limiting, PRA derived stat
- 27 Python tests passing

Feature 1.4: Complete Supabase database schema
- 6 tables: users, picks, scan_sessions, bets, outcomes, performance
- RLS enabled on all tables with auth.uid() policies
- 3 triggers: auto-create user, updated_at, scan count reset
- 37 schema validation tests passing
- Migration SQL ready, pending manual apply (WSL2 DNS blocker)

Total: 92 tests (65 Node.js + 27 Python), all passing

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-21 10:58:58 -04:00

320 lines
9.2 KiB
Python

import time
from datetime import datetime, timedelta, timezone
from nba_api.stats.endpoints import playercareerstats, playergamelog
from nba_api.stats.library.parameters import SeasonAll
from app.utils.cache import cache_get, cache_set
from app.utils.player_map import resolve_player
from app.config import (
SEASON_AVG_TTL, LAST_N_TTL, SPLITS_TTL, NBA_API_DELAY,
NBA_API_RETRY_DELAY, NBA_API_TIMEOUT,
)
# Map nba_api columns to our internal stat names
STAT_MAP = {
"PTS": "points",
"REB": "rebounds",
"AST": "assists",
"FG3M": "threes",
"BLK": "blocks",
"STL": "steals",
"TOV": "turnovers",
"MIN": "minutes",
"GP": "games_played",
}
def get_current_season():
"""Return current NBA season string (e.g., '2025-26'). Season starts in October."""
now = datetime.now(timezone.utc)
year = now.year if now.month >= 10 else now.year - 1
return f"{year}-{str(year + 1)[-2:]}"
def _call_nba_api(fn, **kwargs):
"""Call nba_api with rate limiting and retry."""
time.sleep(NBA_API_DELAY)
try:
return fn(**kwargs, timeout=NBA_API_TIMEOUT)
except Exception:
time.sleep(NBA_API_RETRY_DELAY)
return fn(**kwargs, timeout=NBA_API_TIMEOUT)
def _map_stats(row):
"""Convert a single nba_api stats row to our internal format."""
stats = {}
for nba_col, our_name in STAT_MAP.items():
val = row.get(nba_col)
if val is not None:
stats[our_name] = round(float(val), 1)
# Compute PRA
pts = stats.get("points", 0)
reb = stats.get("rebounds", 0)
ast = stats.get("assists", 0)
stats["pra"] = round(pts + reb + ast, 1)
return stats
def _extract_team(career_data, season):
"""Extract team abbreviation from career stats for given season."""
rows = career_data.get_data_frames()[0]
season_row = rows[rows["SEASON_ID"] == season]
if not season_row.empty:
return season_row.iloc[0]["TEAM_ABBREVIATION"]
if not rows.empty:
return rows.iloc[-1]["TEAM_ABBREVIATION"]
return "UNK"
def get_season_avg(player_name, stat_type=None, season=None):
"""Get a player's season averages."""
player_id, full_name = resolve_player(player_name)
if player_id is None:
return None
if season is None:
season = get_current_season()
cache_key = f"nba:season:{player_id}:{season}"
cached = cache_get(cache_key)
if cached is not None:
result = cached
result["source"] = "cache"
if stat_type and stat_type in result["stats"]:
result["stats"] = {stat_type: result["stats"][stat_type]}
return result
career = _call_nba_api(playercareerstats.PlayerCareerStats, player_id=player_id)
df = career.get_data_frames()[0]
season_row = df[df["SEASON_ID"] == season]
if season_row.empty:
return {
"player": full_name,
"player_id": player_id,
"team": "UNK",
"season": season,
"source": "live",
"stats": {},
}
row = season_row.iloc[0].to_dict()
team = row.get("TEAM_ABBREVIATION", "UNK")
stats = _map_stats(row)
result = {
"player": full_name,
"player_id": player_id,
"team": team,
"season": season,
"source": "live",
"stats": stats,
}
cache_set(cache_key, result, SEASON_AVG_TTL)
if stat_type and stat_type in result["stats"]:
result_filtered = dict(result)
result_filtered["stats"] = {stat_type: result["stats"][stat_type]}
return result_filtered
return result
def get_last_n(player_name, n=10, stat_type=None):
"""Get a player's averages over their last N games."""
player_id, full_name = resolve_player(player_name)
if player_id is None:
return None
n = min(max(n, 1), 30)
cache_key = f"nba:last:{player_id}:{n}"
cached = cache_get(cache_key)
if cached is not None:
result = cached
result["source"] = "cache"
if stat_type and stat_type in result["stats"]:
result["stats"] = {stat_type: result["stats"][stat_type]}
return result
season = get_current_season()
gamelog = _call_nba_api(
playergamelog.PlayerGameLog,
player_id=player_id,
season=season,
)
df = gamelog.get_data_frames()[0]
if df.empty:
return {
"player": full_name,
"player_id": player_id,
"team": "UNK",
"last_n": n,
"source": "live",
"stats": {},
}
last_n_df = df.head(n)
team = last_n_df.iloc[0].get("TEAM_ABBREVIATION", "UNK") if not last_n_df.empty else "UNK"
# Compute averages
avg_row = {}
for col in STAT_MAP:
if col in last_n_df.columns:
avg_row[col] = last_n_df[col].mean()
avg_row["GP"] = len(last_n_df)
stats = _map_stats(avg_row)
result = {
"player": full_name,
"player_id": player_id,
"team": team,
"last_n": n,
"source": "live",
"stats": stats,
}
cache_set(cache_key, result, LAST_N_TTL)
if stat_type and stat_type in result["stats"]:
result_filtered = dict(result)
result_filtered["stats"] = {stat_type: result["stats"][stat_type]}
return result_filtered
return result
def get_splits(player_name, stat_type, split_type, opponent=None):
"""Get situational splits for a player."""
player_id, full_name = resolve_player(player_name)
if player_id is None:
return None
cache_key = f"nba:splits:{player_id}:{stat_type}:{split_type}"
if opponent:
cache_key += f":{opponent}"
cached = cache_get(cache_key)
if cached is not None:
cached["source"] = "cache"
return cached
season = get_current_season()
gamelog = _call_nba_api(
playergamelog.PlayerGameLog,
player_id=player_id,
season=season,
)
df = gamelog.get_data_frames()[0]
if df.empty:
return {
"player": full_name,
"player_id": player_id,
"stat_type": stat_type,
"split_type": split_type,
"source": "live",
"splits": {},
}
# Map stat_type to nba_api column
reverse_map = {v: k for k, v in STAT_MAP.items()}
if stat_type == "pra":
nba_cols = ["PTS", "REB", "AST"]
else:
nba_col = reverse_map.get(stat_type)
if nba_col is None or nba_col not in df.columns:
return None
nba_cols = [nba_col]
def avg_stat(subset):
if subset.empty:
return 0
if stat_type == "pra":
return round((subset["PTS"] + subset["REB"] + subset["AST"]).mean(), 1)
return round(subset[nba_cols[0]].mean(), 1)
team = df.iloc[0].get("TEAM_ABBREVIATION", "UNK") if not df.empty else "UNK"
if split_type == "home_away":
# MATCHUP contains "vs." for home games, "@" for away
home = df[df["MATCHUP"].str.contains("vs.", na=False)]
away = df[df["MATCHUP"].str.contains("@", na=False)]
splits = {
"home": {"avg": avg_stat(home), "games": len(home)},
"away": {"avg": avg_stat(away), "games": len(away)},
}
elif split_type == "rest_days":
df = df.copy()
df["GAME_DATE_PARSED"] = df["GAME_DATE"].apply(_parse_game_date)
df = df.sort_values("GAME_DATE_PARSED")
b2b = []
one_day = []
two_plus = []
dates = df["GAME_DATE_PARSED"].tolist()
for i, row_idx in enumerate(df.index):
if i == 0:
two_plus.append(row_idx)
continue
delta = (dates[i] - dates[i - 1]).days
if delta <= 1:
b2b.append(row_idx)
elif delta == 2:
one_day.append(row_idx)
else:
two_plus.append(row_idx)
splits = {
"b2b": {"avg": avg_stat(df.loc[b2b]) if b2b else 0, "games": len(b2b)},
"1_day_rest": {"avg": avg_stat(df.loc[one_day]) if one_day else 0, "games": len(one_day)},
"2_plus_days_rest": {"avg": avg_stat(df.loc[two_plus]) if two_plus else 0, "games": len(two_plus)},
}
elif split_type == "vs_team":
if not opponent:
return None
opponent_upper = opponent.upper()
vs_opp = df[df["MATCHUP"].str.contains(opponent_upper, na=False)]
vs_others = df[~df["MATCHUP"].str.contains(opponent_upper, na=False)]
splits = {
"vs_opponent": {"avg": avg_stat(vs_opp), "games": len(vs_opp)},
"vs_all_others": {"avg": avg_stat(vs_others), "games": len(vs_others)},
}
else:
return None
result = {
"player": full_name,
"player_id": player_id,
"team": team,
"stat_type": stat_type,
"split_type": split_type,
"source": "live",
"splits": splits,
}
if opponent:
result["opponent"] = opponent
cache_set(cache_key, result, SPLITS_TTL)
return result
def _parse_game_date(date_str):
"""Parse game date from nba_api format. Handles 'MAR 21, 2026' and similar."""
for fmt in ("%b %d, %Y", "%Y-%m-%d", "%m/%d/%Y"):
try:
return datetime.strptime(date_str, fmt)
except (ValueError, TypeError):
continue
return datetime.now(timezone.utc)