import time from datetime import datetime, timedelta, timezone from nba_api.stats.endpoints import playercareerstats, playergamelog from nba_api.stats.library.parameters import SeasonAll from app.utils.cache import cache_get, cache_set from app.utils.player_map import resolve_player from app.config import ( SEASON_AVG_TTL, LAST_N_TTL, SPLITS_TTL, NBA_API_DELAY, NBA_API_RETRY_DELAY, NBA_API_TIMEOUT, ) # Map nba_api columns to our internal stat names STAT_MAP = { "PTS": "points", "REB": "rebounds", "AST": "assists", "FG3M": "threes", "BLK": "blocks", "STL": "steals", "TOV": "turnovers", "MIN": "minutes", "GP": "games_played", } def get_current_season(): """Return current NBA season string (e.g., '2025-26'). Season starts in October.""" now = datetime.now(timezone.utc) year = now.year if now.month >= 10 else now.year - 1 return f"{year}-{str(year + 1)[-2:]}" def _call_nba_api(fn, **kwargs): """Call nba_api with rate limiting and retry.""" time.sleep(NBA_API_DELAY) try: return fn(**kwargs, timeout=NBA_API_TIMEOUT) except Exception: time.sleep(NBA_API_RETRY_DELAY) return fn(**kwargs, timeout=NBA_API_TIMEOUT) def _map_stats(row): """Convert a single nba_api stats row to our internal format.""" stats = {} for nba_col, our_name in STAT_MAP.items(): val = row.get(nba_col) if val is not None: stats[our_name] = round(float(val), 1) # Compute PRA pts = stats.get("points", 0) reb = stats.get("rebounds", 0) ast = stats.get("assists", 0) stats["pra"] = round(pts + reb + ast, 1) return stats def _extract_team(career_data, season): """Extract team abbreviation from career stats for given season.""" rows = career_data.get_data_frames()[0] season_row = rows[rows["SEASON_ID"] == season] if not season_row.empty: return season_row.iloc[0]["TEAM_ABBREVIATION"] if not rows.empty: return rows.iloc[-1]["TEAM_ABBREVIATION"] return "UNK" def get_season_avg(player_name, stat_type=None, season=None): """Get a player's season averages.""" player_id, full_name = resolve_player(player_name) if player_id is None: return None if season is None: season = get_current_season() cache_key = f"nba:season:{player_id}:{season}" cached = cache_get(cache_key) if cached is not None: result = cached result["source"] = "cache" if stat_type and stat_type in result["stats"]: result["stats"] = {stat_type: result["stats"][stat_type]} return result career = _call_nba_api(playercareerstats.PlayerCareerStats, player_id=player_id) df = career.get_data_frames()[0] season_row = df[df["SEASON_ID"] == season] if season_row.empty: return { "player": full_name, "player_id": player_id, "team": "UNK", "season": season, "source": "live", "stats": {}, } row = season_row.iloc[0].to_dict() team = row.get("TEAM_ABBREVIATION", "UNK") stats = _map_stats(row) result = { "player": full_name, "player_id": player_id, "team": team, "season": season, "source": "live", "stats": stats, } cache_set(cache_key, result, SEASON_AVG_TTL) if stat_type and stat_type in result["stats"]: result_filtered = dict(result) result_filtered["stats"] = {stat_type: result["stats"][stat_type]} return result_filtered return result def get_last_n(player_name, n=10, stat_type=None): """Get a player's averages over their last N games.""" player_id, full_name = resolve_player(player_name) if player_id is None: return None n = min(max(n, 1), 30) cache_key = f"nba:last:{player_id}:{n}" cached = cache_get(cache_key) if cached is not None: result = cached result["source"] = "cache" if stat_type and stat_type in result["stats"]: result["stats"] = {stat_type: result["stats"][stat_type]} return result season = get_current_season() gamelog = _call_nba_api( playergamelog.PlayerGameLog, player_id=player_id, season=season, ) df = gamelog.get_data_frames()[0] if df.empty: return { "player": full_name, "player_id": player_id, "team": "UNK", "last_n": n, "source": "live", "stats": {}, } last_n_df = df.head(n) team = last_n_df.iloc[0].get("TEAM_ABBREVIATION", "UNK") if not last_n_df.empty else "UNK" # Compute averages avg_row = {} for col in STAT_MAP: if col in last_n_df.columns: avg_row[col] = last_n_df[col].mean() avg_row["GP"] = len(last_n_df) stats = _map_stats(avg_row) result = { "player": full_name, "player_id": player_id, "team": team, "last_n": n, "source": "live", "stats": stats, } cache_set(cache_key, result, LAST_N_TTL) if stat_type and stat_type in result["stats"]: result_filtered = dict(result) result_filtered["stats"] = {stat_type: result["stats"][stat_type]} return result_filtered return result def get_splits(player_name, stat_type, split_type, opponent=None): """Get situational splits for a player.""" player_id, full_name = resolve_player(player_name) if player_id is None: return None cache_key = f"nba:splits:{player_id}:{stat_type}:{split_type}" if opponent: cache_key += f":{opponent}" cached = cache_get(cache_key) if cached is not None: cached["source"] = "cache" return cached season = get_current_season() gamelog = _call_nba_api( playergamelog.PlayerGameLog, player_id=player_id, season=season, ) df = gamelog.get_data_frames()[0] if df.empty: return { "player": full_name, "player_id": player_id, "stat_type": stat_type, "split_type": split_type, "source": "live", "splits": {}, } # Map stat_type to nba_api column reverse_map = {v: k for k, v in STAT_MAP.items()} if stat_type == "pra": nba_cols = ["PTS", "REB", "AST"] else: nba_col = reverse_map.get(stat_type) if nba_col is None or nba_col not in df.columns: return None nba_cols = [nba_col] def avg_stat(subset): if subset.empty: return 0 if stat_type == "pra": return round((subset["PTS"] + subset["REB"] + subset["AST"]).mean(), 1) return round(subset[nba_cols[0]].mean(), 1) team = df.iloc[0].get("TEAM_ABBREVIATION", "UNK") if not df.empty else "UNK" if split_type == "home_away": # MATCHUP contains "vs." for home games, "@" for away home = df[df["MATCHUP"].str.contains("vs.", na=False)] away = df[df["MATCHUP"].str.contains("@", na=False)] splits = { "home": {"avg": avg_stat(home), "games": len(home)}, "away": {"avg": avg_stat(away), "games": len(away)}, } elif split_type == "rest_days": df = df.copy() df["GAME_DATE_PARSED"] = df["GAME_DATE"].apply(_parse_game_date) df = df.sort_values("GAME_DATE_PARSED") b2b = [] one_day = [] two_plus = [] dates = df["GAME_DATE_PARSED"].tolist() for i, row_idx in enumerate(df.index): if i == 0: two_plus.append(row_idx) continue delta = (dates[i] - dates[i - 1]).days if delta <= 1: b2b.append(row_idx) elif delta == 2: one_day.append(row_idx) else: two_plus.append(row_idx) splits = { "b2b": {"avg": avg_stat(df.loc[b2b]) if b2b else 0, "games": len(b2b)}, "1_day_rest": {"avg": avg_stat(df.loc[one_day]) if one_day else 0, "games": len(one_day)}, "2_plus_days_rest": {"avg": avg_stat(df.loc[two_plus]) if two_plus else 0, "games": len(two_plus)}, } elif split_type == "vs_team": if not opponent: return None opponent_upper = opponent.upper() vs_opp = df[df["MATCHUP"].str.contains(opponent_upper, na=False)] vs_others = df[~df["MATCHUP"].str.contains(opponent_upper, na=False)] splits = { "vs_opponent": {"avg": avg_stat(vs_opp), "games": len(vs_opp)}, "vs_all_others": {"avg": avg_stat(vs_others), "games": len(vs_others)}, } else: return None result = { "player": full_name, "player_id": player_id, "team": team, "stat_type": stat_type, "split_type": split_type, "source": "live", "splits": splits, } if opponent: result["opponent"] = opponent cache_set(cache_key, result, SPLITS_TTL) return result def _parse_game_date(date_str): """Parse game date from nba_api format. Handles 'MAR 21, 2026' and similar.""" for fmt in ("%b %d, %Y", "%Y-%m-%d", "%m/%d/%Y"): try: return datetime.strptime(date_str, fmt) except (ValueError, TypeError): continue return datetime.now(timezone.utc)