/** * ESPN resolution poller — one PM2 process per sport. * * Two jobs: * 1. First time we see STATUS_IN_PROGRESS for a game → trigger OddsPapi * batchCapture for closing lines (CLV reference). * 2. First time we see a FINAL status → wait BUFFER_MS, fetch box score, * POST to /api/grading/resolve. * * Idempotency is enforced via two Redis keys per game: * - game:{id}:status — last status we processed (TTL 36h) * - game:{id}:resolution_lock — set during POST attempt (TTL 5min) * * Never logs VYNDR_INTERNAL_KEY. Headers are constructed inline so the key * never lives in an exported constant that could leak via stack traces. */ const axios = require('axios'); const { getSportConfig } = require('../src/config/sports'); const { cacheGet, cacheSet, getRedisClient } = require('../src/utils/redis'); const { createLimiter, API_BUDGETS } = require('../src/utils/rateLimiter'); const oddsPapiAdapter = require('../src/services/adapters/oddsPapiAdapter'); const SPORT = (process.env.SPORT || '').toLowerCase(); const POLL_INTERVAL_MS = Number(process.env.POLL_INTERVAL) || 60_000; const BUFFER_MS = Number(process.env.BUFFER_MS) || 30_000; const OFF_HOURS_POLL_MS = 5 * 60_000; const VYNDR_API_URL = process.env.VYNDR_API_URL || 'http://localhost:3001'; const NTFY_TOPIC = process.env.NTFY_TOPIC || 'vyndr-admin'; const NTFY_PORT = process.env.NTFY_PORT || '8080'; const STATUS_TTL = 36 * 60 * 60; // 36h, covers doubleheaders const LOCK_TTL = 5 * 60; // 5 min const LIVE_STATUS_TTL = 60 * 60; // 1h const HEARTBEAT_TTL = 180; // 3 min const espnLimiter = createLimiter(API_BUDGETS.espn); const mlbLimiter = createLimiter(API_BUDGETS.mlbStats); function isFinalStatus(state) { if (!state) return false; const upper = String(state).toUpperCase(); return upper.includes('FINAL'); } function isVoidStatus(state) { const upper = String(state || '').toUpperCase(); return upper.includes('POSTPONED') || upper.includes('CANCELED') || upper.includes('CANCELLED'); } function getETHour() { const fmt = new Intl.DateTimeFormat('en-US', { timeZone: 'America/New_York', hour: 'numeric', hour12: false, }); return parseInt(fmt.format(new Date()), 10); } function inGameHours(sportCfg) { const h = getETHour(); // gameEndHourET can be 25 to represent past-midnight ET — wrap with mod. const start = sportCfg.gameStartHourET; const endRaw = sportCfg.gameEndHourET; if (endRaw >= 24) { return h >= start || h < (endRaw - 24); } return h >= start && h < endRaw; } async function ntfyAlert(message) { // Fire-and-forget. Never let alerting failure kill the poller. try { await axios.post(`http://localhost:${NTFY_PORT}/${NTFY_TOPIC}`, message, { timeout: 5_000 }); } catch { /* swallow */ } } async function fetchEspnScoreboard(sportCfg) { await espnLimiter.waitForToken(); const res = await axios.get(sportCfg.espnScoreboard, { timeout: 10_000 }); const events = res.data?.events || []; return events.map((ev) => ({ id: String(ev.id), state: ev?.status?.type?.state, // 'pre' | 'in' | 'post' name: ev?.status?.type?.name, // STATUS_FINAL, STATUS_IN_PROGRESS, etc. competitions: ev.competitions, })); } async function fetchEspnBoxScore(sportCfg, gameId) { // The ?event= query param is REQUIRED — without it ESPN returns nothing. await espnLimiter.waitForToken(); const res = await axios.get(`${sportCfg.espnSummary}?event=${encodeURIComponent(gameId)}`, { timeout: 15_000, }); return res.data; } async function fetchMlbBoxScore(sportCfg, gamePk) { await mlbLimiter.waitForToken(); const res = await axios.get(`${sportCfg.mlbStatsApiBase}/game/${gamePk}/feed/live`, { timeout: 15_000, }); return res.data; } function extractMlbGamePk(espnEvent) { // ESPN MLB events sometimes carry the MLB Stats API gamePk via a sibling // ID on the competition. Common shapes: // ev.competitions[0].uid "s:1~l:10~e:401472045~c:401472045" // ev.competitions[0].externalIds?.mlb const comp = espnEvent?.competitions?.[0]; if (!comp) return null; if (comp.externalIds?.mlb) return String(comp.externalIds.mlb); if (comp.gamePk) return String(comp.gamePk); // Fall back to ESPN event id as a last resort — caller logs if MLB fails. return null; } function validateBoxScore(data, sportCfg) { if (!data) return { valid: false, reason: 'no_data' }; if (sportCfg.useMlbStatsApi) { const teams = data?.liveData?.boxscore?.teams; if (!teams?.home || !teams?.away) return { valid: false, reason: 'mlb_missing_teams' }; return { valid: true }; } const players = data?.boxscore?.players; if (!Array.isArray(players) || players.length < 2) { return { valid: false, reason: 'missing_players' }; } // For category-based sports (NFL) the inner shape differs from basketball // (statistics array) — both at least exist. if (!players[0]?.statistics) return { valid: false, reason: 'no_statistics' }; return { valid: true }; } async function postResolution(payload, attempt = 1) { const maxAttempts = 3; try { const res = await axios.post( `${VYNDR_API_URL}/api/grading/resolve`, payload, { headers: { 'Content-Type': 'application/json', 'X-VYNDR-Internal-Key': process.env.VYNDR_INTERNAL_KEY || '', }, timeout: 30_000, validateStatus: (s) => s >= 200 && s < 500, } ); if (res.status >= 200 && res.status < 300) { console.log(`[poller-${SPORT}] POST /api/grading/resolve → ${res.status} (${res.data?.resolved ?? 0} resolved, ${res.data?.voided ?? 0} voided)`); return res.data; } throw new Error(`status=${res.status}`); } catch (err) { if (attempt < maxAttempts) { console.warn(`[poller-${SPORT}] POST attempt ${attempt} failed: ${err.message}. retrying in 30s.`); await new Promise((r) => setTimeout(r, 30_000)); return postResolution(payload, attempt + 1); } await ntfyAlert(`VYNDR poller-${SPORT}: 3x POST /api/grading/resolve failed for game ${payload.gameId}`); return null; } } async function tryAcquireLock(gameId) { // SET NX EX — atomic check-and-set with TTL. ioredis returns 'OK' on win. const redis = getRedisClient(); const result = await redis.set(`game:${gameId}:resolution_lock`, '1', 'EX', LOCK_TTL, 'NX'); return result === 'OK'; } async function handleGame(game, sportCfg) { const statusKey = `game:${game.id}:status`; const liveKey = `game:${game.id}:live_status`; const prevStatus = await cacheGet(statusKey); const currentStatus = game.name; // Always update live_status for the frontend badges. await cacheSet(liveKey, currentStatus, LIVE_STATUS_TTL); // No-op if we've already processed this exact status. if (prevStatus === currentStatus) return; if (currentStatus === 'STATUS_IN_PROGRESS' && prevStatus !== 'STATUS_IN_PROGRESS') { console.log(`[poller-${SPORT}] tip-off ${game.id} — triggering OddsPapi capture`); try { await oddsPapiAdapter.batchCapture(SPORT, game.id); } catch (err) { console.warn(`[poller-${SPORT}] OddsPapi capture failed: ${err.message}`); } await cacheSet(statusKey, currentStatus, STATUS_TTL); return; } if (isVoidStatus(currentStatus)) { if (!(await tryAcquireLock(game.id))) return; console.log(`[poller-${SPORT}] ${game.id} → ${currentStatus} (void)`); await postResolution({ gameId: game.id, sport: SPORT, void: true, reason: currentStatus.toLowerCase() }); await cacheSet(statusKey, currentStatus, STATUS_TTL); return; } if (isFinalStatus(currentStatus)) { if (!(await tryAcquireLock(game.id))) return; await new Promise((r) => setTimeout(r, BUFFER_MS)); let boxScore; try { boxScore = sportCfg.useMlbStatsApi ? await fetchMlbBoxScore(sportCfg, extractMlbGamePk(game) || game.id) : await fetchEspnBoxScore(sportCfg, game.id); } catch (err) { console.warn(`[poller-${SPORT}] box-score fetch failed for ${game.id}: ${err.message}`); await ntfyAlert(`VYNDR poller-${SPORT}: box-score fetch failed for game ${game.id}`); return; } const verdict = validateBoxScore(boxScore, sportCfg); if (!verdict.valid) { console.warn(`[poller-${SPORT}] invalid box score for ${game.id}: ${verdict.reason}`); await ntfyAlert(`VYNDR poller-${SPORT}: invalid box score (${verdict.reason}) for game ${game.id}`); return; } await postResolution({ gameId: game.id, sport: SPORT, boxScore }); await cacheSet(statusKey, currentStatus, STATUS_TTL); return; } // Any other status we just remember so we don't re-print on every tick. await cacheSet(statusKey, currentStatus, STATUS_TTL); } async function tick(sportCfg) { await cacheSet(`poller:${SPORT}:heartbeat`, new Date().toISOString(), HEARTBEAT_TTL); let games; try { games = await fetchEspnScoreboard(sportCfg); } catch (err) { console.warn(`[poller-${SPORT}] scoreboard fetch failed: ${err.message}`); return; } for (const g of games) { try { await handleGame(g, sportCfg); } catch (err) { console.warn(`[poller-${SPORT}] game ${g.id} handler error: ${err.message}`); } } } async function main() { if (!SPORT) { console.error('SPORT env var is required'); process.exit(1); } let sportCfg; try { sportCfg = getSportConfig(SPORT); } catch (err) { console.error(err.message); process.exit(1); } console.log(`[poller-${SPORT}] starting — pollInterval=${POLL_INTERVAL_MS}ms buffer=${BUFFER_MS}ms`); // Run forever. The PM2 supervisor restarts on crash; tick errors are // already caught inside. /* eslint-disable no-constant-condition */ while (true) { await tick(sportCfg); const intervalMs = inGameHours(sportCfg) ? POLL_INTERVAL_MS : OFF_HOURS_POLL_MS; await new Promise((r) => setTimeout(r, intervalMs)); } } // Surface for tests — they import individual handlers without firing main(). module.exports = { isFinalStatus, isVoidStatus, validateBoxScore, inGameHours, getETHour, extractMlbGamePk, handleGame, postResolution, // Internal — tests may need to clear/inspect state. __testing: { espnLimiter, mlbLimiter }, }; if (require.main === module) { main().catch((err) => { console.error('[poller] fatal:', err); process.exit(1); }); }