286 lines
10 KiB
JavaScript
286 lines
10 KiB
JavaScript
/**
|
|
* ESPN resolution poller — one PM2 process per sport.
|
|
*
|
|
* Two jobs:
|
|
* 1. First time we see STATUS_IN_PROGRESS for a game → trigger OddsPapi
|
|
* batchCapture for closing lines (CLV reference).
|
|
* 2. First time we see a FINAL status → wait BUFFER_MS, fetch box score,
|
|
* POST to /api/grading/resolve.
|
|
*
|
|
* Idempotency is enforced via two Redis keys per game:
|
|
* - game:{id}:status — last status we processed (TTL 36h)
|
|
* - game:{id}:resolution_lock — set during POST attempt (TTL 5min)
|
|
*
|
|
* Never logs VYNDR_INTERNAL_KEY. Headers are constructed inline so the key
|
|
* never lives in an exported constant that could leak via stack traces.
|
|
*/
|
|
|
|
const axios = require('axios');
|
|
const { getSportConfig } = require('../src/config/sports');
|
|
const { cacheGet, cacheSet, getRedisClient } = require('../src/utils/redis');
|
|
const { createLimiter, API_BUDGETS } = require('../src/utils/rateLimiter');
|
|
const oddsPapiAdapter = require('../src/services/adapters/oddsPapiAdapter');
|
|
|
|
const SPORT = (process.env.SPORT || '').toLowerCase();
|
|
const POLL_INTERVAL_MS = Number(process.env.POLL_INTERVAL) || 60_000;
|
|
const BUFFER_MS = Number(process.env.BUFFER_MS) || 30_000;
|
|
const OFF_HOURS_POLL_MS = 5 * 60_000;
|
|
const VYNDR_API_URL = process.env.VYNDR_API_URL || 'http://localhost:3001';
|
|
const NTFY_TOPIC = process.env.NTFY_TOPIC || 'vyndr-admin';
|
|
const NTFY_PORT = process.env.NTFY_PORT || '8080';
|
|
|
|
const STATUS_TTL = 36 * 60 * 60; // 36h, covers doubleheaders
|
|
const LOCK_TTL = 5 * 60; // 5 min
|
|
const LIVE_STATUS_TTL = 60 * 60; // 1h
|
|
const HEARTBEAT_TTL = 180; // 3 min
|
|
|
|
const espnLimiter = createLimiter(API_BUDGETS.espn);
|
|
const mlbLimiter = createLimiter(API_BUDGETS.mlbStats);
|
|
|
|
function isFinalStatus(state) {
|
|
if (!state) return false;
|
|
const upper = String(state).toUpperCase();
|
|
return upper.includes('FINAL');
|
|
}
|
|
|
|
function isVoidStatus(state) {
|
|
const upper = String(state || '').toUpperCase();
|
|
return upper.includes('POSTPONED') || upper.includes('CANCELED') || upper.includes('CANCELLED');
|
|
}
|
|
|
|
function getETHour() {
|
|
const fmt = new Intl.DateTimeFormat('en-US', {
|
|
timeZone: 'America/New_York', hour: 'numeric', hour12: false,
|
|
});
|
|
return parseInt(fmt.format(new Date()), 10);
|
|
}
|
|
|
|
function inGameHours(sportCfg) {
|
|
const h = getETHour();
|
|
// gameEndHourET can be 25 to represent past-midnight ET — wrap with mod.
|
|
const start = sportCfg.gameStartHourET;
|
|
const endRaw = sportCfg.gameEndHourET;
|
|
if (endRaw >= 24) {
|
|
return h >= start || h < (endRaw - 24);
|
|
}
|
|
return h >= start && h < endRaw;
|
|
}
|
|
|
|
async function ntfyAlert(message) {
|
|
// Fire-and-forget. Never let alerting failure kill the poller.
|
|
try {
|
|
await axios.post(`http://localhost:${NTFY_PORT}/${NTFY_TOPIC}`, message, { timeout: 5_000 });
|
|
} catch { /* swallow */ }
|
|
}
|
|
|
|
async function fetchEspnScoreboard(sportCfg) {
|
|
await espnLimiter.waitForToken();
|
|
const res = await axios.get(sportCfg.espnScoreboard, { timeout: 10_000 });
|
|
const events = res.data?.events || [];
|
|
return events.map((ev) => ({
|
|
id: String(ev.id),
|
|
state: ev?.status?.type?.state, // 'pre' | 'in' | 'post'
|
|
name: ev?.status?.type?.name, // STATUS_FINAL, STATUS_IN_PROGRESS, etc.
|
|
competitions: ev.competitions,
|
|
}));
|
|
}
|
|
|
|
async function fetchEspnBoxScore(sportCfg, gameId) {
|
|
// The ?event= query param is REQUIRED — without it ESPN returns nothing.
|
|
await espnLimiter.waitForToken();
|
|
const res = await axios.get(`${sportCfg.espnSummary}?event=${encodeURIComponent(gameId)}`, {
|
|
timeout: 15_000,
|
|
});
|
|
return res.data;
|
|
}
|
|
|
|
async function fetchMlbBoxScore(sportCfg, gamePk) {
|
|
await mlbLimiter.waitForToken();
|
|
const res = await axios.get(`${sportCfg.mlbStatsApiBase}/game/${gamePk}/feed/live`, {
|
|
timeout: 15_000,
|
|
});
|
|
return res.data;
|
|
}
|
|
|
|
function extractMlbGamePk(espnEvent) {
|
|
// ESPN MLB events sometimes carry the MLB Stats API gamePk via a sibling
|
|
// ID on the competition. Common shapes:
|
|
// ev.competitions[0].uid "s:1~l:10~e:401472045~c:401472045"
|
|
// ev.competitions[0].externalIds?.mlb
|
|
const comp = espnEvent?.competitions?.[0];
|
|
if (!comp) return null;
|
|
if (comp.externalIds?.mlb) return String(comp.externalIds.mlb);
|
|
if (comp.gamePk) return String(comp.gamePk);
|
|
// Fall back to ESPN event id as a last resort — caller logs if MLB fails.
|
|
return null;
|
|
}
|
|
|
|
function validateBoxScore(data, sportCfg) {
|
|
if (!data) return { valid: false, reason: 'no_data' };
|
|
if (sportCfg.useMlbStatsApi) {
|
|
const teams = data?.liveData?.boxscore?.teams;
|
|
if (!teams?.home || !teams?.away) return { valid: false, reason: 'mlb_missing_teams' };
|
|
return { valid: true };
|
|
}
|
|
const players = data?.boxscore?.players;
|
|
if (!Array.isArray(players) || players.length < 2) {
|
|
return { valid: false, reason: 'missing_players' };
|
|
}
|
|
// For category-based sports (NFL) the inner shape differs from basketball
|
|
// (statistics array) — both at least exist.
|
|
if (!players[0]?.statistics) return { valid: false, reason: 'no_statistics' };
|
|
return { valid: true };
|
|
}
|
|
|
|
async function postResolution(payload, attempt = 1) {
|
|
const maxAttempts = 3;
|
|
try {
|
|
const res = await axios.post(
|
|
`${VYNDR_API_URL}/api/grading/resolve`,
|
|
payload,
|
|
{
|
|
headers: {
|
|
'Content-Type': 'application/json',
|
|
'X-VYNDR-Internal-Key': process.env.VYNDR_INTERNAL_KEY || '',
|
|
},
|
|
timeout: 30_000,
|
|
validateStatus: (s) => s >= 200 && s < 500,
|
|
}
|
|
);
|
|
if (res.status >= 200 && res.status < 300) {
|
|
console.log(`[poller-${SPORT}] POST /api/grading/resolve → ${res.status} (${res.data?.resolved ?? 0} resolved, ${res.data?.voided ?? 0} voided)`);
|
|
return res.data;
|
|
}
|
|
throw new Error(`status=${res.status}`);
|
|
} catch (err) {
|
|
if (attempt < maxAttempts) {
|
|
console.warn(`[poller-${SPORT}] POST attempt ${attempt} failed: ${err.message}. retrying in 30s.`);
|
|
await new Promise((r) => setTimeout(r, 30_000));
|
|
return postResolution(payload, attempt + 1);
|
|
}
|
|
await ntfyAlert(`VYNDR poller-${SPORT}: 3x POST /api/grading/resolve failed for game ${payload.gameId}`);
|
|
return null;
|
|
}
|
|
}
|
|
|
|
async function tryAcquireLock(gameId) {
|
|
// SET NX EX — atomic check-and-set with TTL. ioredis returns 'OK' on win.
|
|
const redis = getRedisClient();
|
|
const result = await redis.set(`game:${gameId}:resolution_lock`, '1', 'EX', LOCK_TTL, 'NX');
|
|
return result === 'OK';
|
|
}
|
|
|
|
async function handleGame(game, sportCfg) {
|
|
const statusKey = `game:${game.id}:status`;
|
|
const liveKey = `game:${game.id}:live_status`;
|
|
const prevStatus = await cacheGet(statusKey);
|
|
const currentStatus = game.name;
|
|
|
|
// Always update live_status for the frontend badges.
|
|
await cacheSet(liveKey, currentStatus, LIVE_STATUS_TTL);
|
|
|
|
// No-op if we've already processed this exact status.
|
|
if (prevStatus === currentStatus) return;
|
|
|
|
if (currentStatus === 'STATUS_IN_PROGRESS' && prevStatus !== 'STATUS_IN_PROGRESS') {
|
|
console.log(`[poller-${SPORT}] tip-off ${game.id} — triggering OddsPapi capture`);
|
|
try { await oddsPapiAdapter.batchCapture(SPORT, game.id); }
|
|
catch (err) { console.warn(`[poller-${SPORT}] OddsPapi capture failed: ${err.message}`); }
|
|
await cacheSet(statusKey, currentStatus, STATUS_TTL);
|
|
return;
|
|
}
|
|
|
|
if (isVoidStatus(currentStatus)) {
|
|
if (!(await tryAcquireLock(game.id))) return;
|
|
console.log(`[poller-${SPORT}] ${game.id} → ${currentStatus} (void)`);
|
|
await postResolution({ gameId: game.id, sport: SPORT, void: true, reason: currentStatus.toLowerCase() });
|
|
await cacheSet(statusKey, currentStatus, STATUS_TTL);
|
|
return;
|
|
}
|
|
|
|
if (isFinalStatus(currentStatus)) {
|
|
if (!(await tryAcquireLock(game.id))) return;
|
|
await new Promise((r) => setTimeout(r, BUFFER_MS));
|
|
let boxScore;
|
|
try {
|
|
boxScore = sportCfg.useMlbStatsApi
|
|
? await fetchMlbBoxScore(sportCfg, extractMlbGamePk(game) || game.id)
|
|
: await fetchEspnBoxScore(sportCfg, game.id);
|
|
} catch (err) {
|
|
console.warn(`[poller-${SPORT}] box-score fetch failed for ${game.id}: ${err.message}`);
|
|
await ntfyAlert(`VYNDR poller-${SPORT}: box-score fetch failed for game ${game.id}`);
|
|
return;
|
|
}
|
|
const verdict = validateBoxScore(boxScore, sportCfg);
|
|
if (!verdict.valid) {
|
|
console.warn(`[poller-${SPORT}] invalid box score for ${game.id}: ${verdict.reason}`);
|
|
await ntfyAlert(`VYNDR poller-${SPORT}: invalid box score (${verdict.reason}) for game ${game.id}`);
|
|
return;
|
|
}
|
|
await postResolution({ gameId: game.id, sport: SPORT, boxScore });
|
|
await cacheSet(statusKey, currentStatus, STATUS_TTL);
|
|
return;
|
|
}
|
|
|
|
// Any other status we just remember so we don't re-print on every tick.
|
|
await cacheSet(statusKey, currentStatus, STATUS_TTL);
|
|
}
|
|
|
|
async function tick(sportCfg) {
|
|
await cacheSet(`poller:${SPORT}:heartbeat`, new Date().toISOString(), HEARTBEAT_TTL);
|
|
let games;
|
|
try { games = await fetchEspnScoreboard(sportCfg); }
|
|
catch (err) {
|
|
console.warn(`[poller-${SPORT}] scoreboard fetch failed: ${err.message}`);
|
|
return;
|
|
}
|
|
for (const g of games) {
|
|
try { await handleGame(g, sportCfg); }
|
|
catch (err) { console.warn(`[poller-${SPORT}] game ${g.id} handler error: ${err.message}`); }
|
|
}
|
|
}
|
|
|
|
async function main() {
|
|
if (!SPORT) {
|
|
console.error('SPORT env var is required');
|
|
process.exit(1);
|
|
}
|
|
let sportCfg;
|
|
try { sportCfg = getSportConfig(SPORT); }
|
|
catch (err) {
|
|
console.error(err.message);
|
|
process.exit(1);
|
|
}
|
|
console.log(`[poller-${SPORT}] starting — pollInterval=${POLL_INTERVAL_MS}ms buffer=${BUFFER_MS}ms`);
|
|
|
|
// Run forever. The PM2 supervisor restarts on crash; tick errors are
|
|
// already caught inside.
|
|
/* eslint-disable no-constant-condition */
|
|
while (true) {
|
|
await tick(sportCfg);
|
|
const intervalMs = inGameHours(sportCfg) ? POLL_INTERVAL_MS : OFF_HOURS_POLL_MS;
|
|
await new Promise((r) => setTimeout(r, intervalMs));
|
|
}
|
|
}
|
|
|
|
// Surface for tests — they import individual handlers without firing main().
|
|
module.exports = {
|
|
isFinalStatus,
|
|
isVoidStatus,
|
|
validateBoxScore,
|
|
inGameHours,
|
|
getETHour,
|
|
extractMlbGamePk,
|
|
handleGame,
|
|
postResolution,
|
|
// Internal — tests may need to clear/inspect state.
|
|
__testing: { espnLimiter, mlbLimiter },
|
|
};
|
|
|
|
if (require.main === module) {
|
|
main().catch((err) => {
|
|
console.error('[poller] fatal:', err);
|
|
process.exit(1);
|
|
});
|
|
}
|