Session 20: Provider intelligence — quota tracker, gateway with fallback cascade, admin quota dashboard (1476 tests)

This commit is contained in:
Kev
2026-06-12 00:54:39 -04:00
parent 56392ec8f4
commit 9b10bb4138
17 changed files with 1422 additions and 15 deletions
+149
View File
@@ -0,0 +1,149 @@
'use strict';
/**
* Provider registry (Session 20).
*
* Every external data provider VYNDR talks to is enumerated here.
* Each entry declares:
* - envKey — the environment variable holding the API key
* (presence = the provider is configured)
* - quotaType — 'monthly' | 'daily' | 'per_minute'
* - quotaLimit — calls allowed per quotaType period
* - resetDay — for monthly quotas, day-of-month the counter
* resets (1 = first of the month). null otherwise.
* - sports — which sport keys this provider covers
* - capabilities — what kinds of data it can return
* - priority — 1 = primary for its capability set; higher
* numbers are fallbacks
*
* The quotaTracker keys off provider IDs from this map. Wiring a
* new provider = adding it here + having its adapter route through
* providerGateway.fetch(providerId, callback, opts).
*
* IMPORTANT: keep quotaLimit conservative. Over-counting throttles
* the platform under-load; under-counting blows the budget. If a
* provider's actual limit changes (e.g. plan upgrade), update this
* number — the tracker re-reads it each call.
*/
const PROVIDERS = {
// === ODDS / LINES ===
'odds-api': {
name: 'The Odds API',
envKey: 'ODDS_API_KEY',
quotaType: 'monthly',
quotaLimit: 500,
resetDay: 1,
sports: ['nba', 'wnba', 'mlb', 'soccer_wc', 'nfl', 'nhl'],
capabilities: ['odds', 'props', 'lines', 'spreads'],
priority: 1,
},
'oddspapi': {
name: 'ODDSPAPI',
envKey: 'ODDSPAPI_KEY',
quotaType: 'monthly',
quotaLimit: 1000,
resetDay: 1,
sports: ['nba', 'wnba', 'mlb', 'nfl'],
capabilities: ['odds', 'props'],
priority: 2,
},
'parlayapi': {
name: 'ParlayAPI',
envKey: 'PARLAYAPI_KEY',
quotaType: 'monthly',
quotaLimit: 1000,
resetDay: 1,
sports: ['nba', 'wnba', 'mlb', 'nfl'],
capabilities: ['odds', 'parlays', 'correlations'],
priority: 3,
},
// === STATS / BOX SCORES ===
'tank01': {
name: 'Tank01 (RapidAPI)',
envKey: 'RAPID_API_KEY',
quotaType: 'monthly',
quotaLimit: 1000,
resetDay: 1,
sports: ['nba', 'mlb'],
capabilities: ['box_scores', 'schedules', 'player_stats', 'bvp'],
priority: 1,
},
// === SOCCER ===
'api-football': {
name: 'API-Football',
envKey: 'API_FOOTBALL_KEY',
quotaType: 'daily',
quotaLimit: 100,
resetDay: null,
sports: ['soccer_wc', 'soccer'],
capabilities: ['lineups', 'player_stats', 'match_events', 'live_scores'],
priority: 1,
},
'football-data': {
name: 'Football-Data.org',
envKey: 'FOOTBALL_DATA_API_KEY',
quotaType: 'per_minute',
quotaLimit: 10,
resetDay: null,
sports: ['soccer_wc', 'soccer'],
capabilities: ['standings', 'fixtures', 'scorers'],
priority: 2,
},
};
/**
* Threshold constants — shared by quotaTracker and providerGateway
* so the WARN/BLOCK lines stay in lockstep.
*/
const THRESHOLDS = Object.freeze({
WARN_PCT: 0.80,
BLOCK_PCT: 0.95,
});
function getProvider(providerId) {
return PROVIDERS[providerId] || null;
}
function listProviderIds() {
return Object.keys(PROVIDERS);
}
/**
* Subset of providers whose envKey is set. Logged at startup; used
* by the admin dashboard to render only providers the operator has
* actually wired up.
*/
function getConfiguredProviders() {
return Object.entries(PROVIDERS)
.filter(([, cfg]) => !!process.env[cfg.envKey])
.map(([id, cfg]) => ({ id, ...cfg }));
}
/**
* Fallback chain for a capability + sport, in priority order,
* excluding `excludeId`. Used by the gateway to walk down to the
* next provider when the primary is exhausted.
*/
function getFallbackChain(capability, sport, excludeId) {
return Object.entries(PROVIDERS)
.filter(([id, cfg]) =>
id !== excludeId &&
cfg.capabilities.includes(capability) &&
(!sport || cfg.sports.includes(sport)) &&
!!process.env[cfg.envKey],
)
.sort((a, b) => a[1].priority - b[1].priority)
.map(([id]) => id);
}
module.exports = {
PROVIDERS,
THRESHOLDS,
getProvider,
listProviderIds,
getConfiguredProviders,
getFallbackChain,
};
+20
View File
@@ -17,11 +17,31 @@
const express = require('express');
const { requireInternalAuth } = require('../middleware/internalAuth');
const tank01Prefetch = require('../../scripts/tank01-prefetch');
const quotaTracker = require('../services/quotaTracker');
const router = express.Router();
router.use(requireInternalAuth({ loopbackOnly: false }));
/**
* GET /api/internal/quota (Session 20)
*
* Snapshot of every configured provider's current quota counter.
* Consumed by the admin dashboard's "Provider Quotas" tile. Cached
* for 5s so a refresh-button mash doesn't flood Redis.
*/
router.get('/quota', async (req, res) => {
try {
const providers = await quotaTracker.getAllQuotaStatuses();
res.set('Cache-Control', 'private, max-age=5');
return res.json({ ok: true, providers });
} catch (err) {
const message = err && err.message ? err.message : String(err);
console.error('[internal/quota] failed:', message);
return res.status(500).json({ ok: false, error: message });
}
});
/**
* POST /api/internal/prefetch/tank01
*
+11
View File
@@ -1,4 +1,9 @@
const app = require('./app');
// Session 20 — surface which providers are actually configured at
// boot. A silently-missing key (e.g. ODDSPAPI_KEY unset in prod)
// otherwise only manifests when the gateway tries to fall over and
// finds no chain.
const { getConfiguredProviders, listProviderIds } = require('./config/providers');
// Default 3001 — Next.js owns 3000 locally and in production. The poller,
// internal cron, and BASE_URL conventions all assume 3001 for the Express
@@ -7,4 +12,10 @@ const PORT = process.env.PORT || 3001;
app.listen(PORT, () => {
console.log(`[VYNDR] Server running on port ${PORT}`);
const configured = getConfiguredProviders();
const missing = listProviderIds().filter((id) => !configured.find((c) => c.id === id));
console.log(`[VYNDR] providers configured (${configured.length}): ${configured.map((c) => c.id).join(', ') || 'none'}`);
if (missing.length) {
console.warn(`[VYNDR] providers missing keys: ${missing.join(', ')}`);
}
});
+49 -13
View File
@@ -1,6 +1,12 @@
const axios = require('axios');
const { getRedisClient } = require('../utils/redis');
const { normalizeProps, extractSpreads, MARKET_MAP } = require('../utils/oddsNormalizer');
// Session 20 — every odds-api hit now flows through the gateway so
// the quota counter advances and the fallback chain (oddspapi,
// parlayapi) can take over when we approach the monthly cap. The
// gateway is intentionally light-weight on the hot path — it adds
// one Redis GET + SET per call (degrades open if Redis is down).
const gateway = require('./providerGateway');
const ODDS_API_BASE = 'https://api.the-odds-api.com/v4/sports';
const CACHE_TTL = 900; // 15 minutes in seconds
@@ -137,6 +143,19 @@ async function getQuotaRemaining(redis) {
async function updateQuota(redis, headers) {
const remaining = headers['x-requests-remaining'];
const used = headers['x-requests-used'];
// Session 20 — sync the per-provider tracker from the same
// headers. The gateway's syncHeadersFrom already does this on
// each call; doing it here too is belt-and-suspenders for any
// call path that bypassed the gateway. Lazy-required so the
// tests that don't mock providerGateway don't crash on load.
try {
const quotaTracker = require('./quotaTracker');
await quotaTracker.syncFromHeaders('odds-api', headers);
} catch (e) {
// Tracker failure must never break odds delivery — it's a
// signal, not a dependency.
console.warn('[oddsService] quotaTracker sync failed:', e.message);
}
if (remaining != null) {
const key = getQuotaKey();
await redis.hset(key, 'remaining', String(remaining), 'used', String(used || 0), 'last_checked', new Date().toISOString());
@@ -150,10 +169,19 @@ async function updateQuota(redis, headers) {
async function fetchEventsFromApi(sportKey, apiKey) {
const url = `${ODDS_API_BASE}/${sportKey}/events`;
const response = await axios.get(url, {
params: { apiKey },
timeout: 10000,
});
const response = await gateway.fetch(
'odds-api',
() => axios.get(url, { params: { apiKey }, timeout: 10000 }),
{
capability: 'odds',
// Best-effort sport tag for the fallback chain. The events
// endpoint isn't sport-scoped on the fallback providers, but
// passing it through lets the registry filter to relevant
// candidates.
sport: sportKey.replace(/^.*?_/, ''),
syncHeadersFrom: (r) => r && r.headers,
},
);
return { data: response.data, headers: response.headers };
}
@@ -163,16 +191,24 @@ async function fetchEventsFromApi(sportKey, apiKey) {
// market set, which is what every legacy caller assumed.
async function fetchEventOddsFromApi(sportKey, eventId, apiKey, sport) {
const url = `${ODDS_API_BASE}/${sportKey}/events/${eventId}/odds`;
const response = await axios.get(url, {
params: {
apiKey,
regions: 'us',
markets: getMarketsForSport(sport),
bookmakers: BOOKMAKERS,
oddsFormat: 'american',
const response = await gateway.fetch(
'odds-api',
() => axios.get(url, {
params: {
apiKey,
regions: 'us',
markets: getMarketsForSport(sport),
bookmakers: BOOKMAKERS,
oddsFormat: 'american',
},
timeout: 10000,
}),
{
capability: 'odds',
sport,
syncHeadersFrom: (r) => r && r.headers,
},
timeout: 10000,
});
);
return { data: response.data, headers: response.headers };
}
+133
View File
@@ -0,0 +1,133 @@
'use strict';
/**
* Provider gateway (Session 20).
*
* The single entry point every external-data call passes through.
* Adapters call:
*
* const result = await gateway.fetch('odds-api', cbWithProvider, {
* capability: 'odds',
* sport: 'nba',
* fallbackProviders: ['oddspapi'], // optional override
* syncHeadersFrom: (r) => r.headers, // optional
* });
*
* Flow:
* 1. Check primary provider's quota via quotaTracker
* 2. If allowed → invoke callback, sync headers on success
* 3. If blocked → walk the fallback chain (explicit or
* capability-derived from the registry)
* 4. If every provider is exhausted → throw QuotaExhaustedError
* with a structured `attempts` log so the operator can see
* what was tried
* 5. Adapter-thrown errors propagate after rollback
*
* Callback receives the providerId actually being used so it can
* pick the right base URL / API key for fallbacks. For
* single-provider calls, callers can ignore the argument.
*/
const quotaTracker = require('./quotaTracker');
const { getFallbackChain } = require('../config/providers');
class QuotaExhaustedError extends Error {
constructor(primary, sport, attempts) {
super(`All providers exhausted for ${primary}/${sport || '*'}. Tried: ${attempts.map((a) => `${a.provider}=${a.reason}`).join('; ')}`);
this.name = 'QuotaExhaustedError';
this.code = 'QUOTA_EXHAUSTED';
this.statusCode = 503;
this.primary = primary;
this.sport = sport;
this.attempts = attempts;
}
}
async function tryOne(providerId, callbackFn, syncHeadersFrom) {
// Optimistic increment — if the call throws we roll back below.
// recordCall also evaluates the post-increment threshold; if the
// very next call would put us at 95%+, we still execute THIS one
// (it returned allowed:true before incrementing) and the NEXT
// call will see the block.
const status = await quotaTracker.recordCall(providerId);
if (!status.allowed) {
await quotaTracker.rollback(providerId);
return { ok: false, reason: status.reason || 'blocked', status };
}
try {
const result = await callbackFn(providerId);
// Best-effort header sync — caller signals where the headers
// live on the response object. Failure is non-fatal; the
// optimistic counter remains.
if (typeof syncHeadersFrom === 'function') {
try {
const headers = syncHeadersFrom(result);
if (headers) await quotaTracker.syncFromHeaders(providerId, headers);
} catch (e) {
console.warn(`[gateway] header sync failed for ${providerId}: ${e.message}`);
}
}
return { ok: true, result, provider: providerId };
} catch (err) {
await quotaTracker.rollback(providerId);
return { ok: false, reason: err && err.message ? err.message : 'error', err };
}
}
/**
* Invoke `callbackFn` against the primary provider, falling over
* to alternatives in the fallback chain if quota is exhausted.
*
* IMPORTANT: this only retries fallbacks on QUOTA failures, not on
* generic upstream errors. A network blip on the primary doesn't
* silently shift the entire platform to the fallback (that masks
* outages); it surfaces as the adapter's normal error path.
*/
async function fetch(primaryId, callbackFn, opts = {}) {
const {
capability,
sport,
fallbackProviders,
syncHeadersFrom,
} = opts;
const attempts = [];
const result = await tryOne(primaryId, callbackFn, syncHeadersFrom);
if (result.ok) return result.result;
// Generic adapter error on the primary — propagate, don't shift.
if (result.err) {
attempts.push({ provider: primaryId, reason: result.reason });
throw result.err;
}
attempts.push({ provider: primaryId, reason: result.reason });
// Build the fallback chain. Caller can override; otherwise derive
// from the capability/sport pair in the registry.
const chain = Array.isArray(fallbackProviders) && fallbackProviders.length
? fallbackProviders
: capability
? getFallbackChain(capability, sport, primaryId)
: [];
for (const fallbackId of chain) {
const fb = await tryOne(fallbackId, callbackFn, syncHeadersFrom);
if (fb.ok) {
console.log(`[gateway] primary=${primaryId} blocked; succeeded via fallback=${fallbackId}`);
return fb.result;
}
attempts.push({ provider: fallbackId, reason: fb.reason });
// Generic error on a fallback → record and continue to the next.
// We don't propagate fallback errors because the user only sees
// one final response, and the original primary was already
// unavailable when we entered this loop.
}
throw new QuotaExhaustedError(primaryId, sport, attempts);
}
module.exports = {
fetch,
QuotaExhaustedError,
};
+281
View File
@@ -0,0 +1,281 @@
'use strict';
/**
* Quota tracker (Session 20).
*
* Single source of truth for "how many calls have we made to each
* provider this period?" Pure Redis-backed counter — no in-process
* memory beyond a one-shot warn dedupe set per period.
*
* Redis key shape:
* quota:{providerId}:{period} → { used: number, limit: number, syncedAt?: iso }
* quota_warned:{providerId}:{period} → '1' (dedupes the 80% warn line)
*
* Period format:
* monthly: "YYYY-MM"
* daily: "YYYY-MM-DD"
* per_minute: "YYYY-MM-DDTHH:MM"
*
* Degraded mode: when Redis is unavailable, `getQuotaStatus`
* returns `{ allowed: true, degraded: true }` — i.e. we FAIL OPEN
* (allow the call) rather than block the whole platform on a Redis
* outage. The original quota-exhaustion bug we're guarding against
* is recoverable (a real upstream 429 stops the bleeding); a
* Redis-tied block would make every outage existential.
*/
const { cacheGet, cacheSet, isDegraded } = require('../utils/redis');
const { getProvider, THRESHOLDS, getConfiguredProviders } = require('../config/providers');
function pad(n) {
return String(n).padStart(2, '0');
}
function getPeriodKey(providerId, now = new Date()) {
const cfg = getProvider(providerId);
if (!cfg) return '';
switch (cfg.quotaType) {
case 'monthly':
return `${now.getUTCFullYear()}-${pad(now.getUTCMonth() + 1)}`;
case 'daily':
return `${now.getUTCFullYear()}-${pad(now.getUTCMonth() + 1)}-${pad(now.getUTCDate())}`;
case 'per_minute':
return `${now.getUTCFullYear()}-${pad(now.getUTCMonth() + 1)}-${pad(now.getUTCDate())}T${pad(now.getUTCHours())}:${pad(now.getUTCMinutes())}`;
default:
return `${now.getUTCFullYear()}-${pad(now.getUTCMonth() + 1)}-${pad(now.getUTCDate())}`;
}
}
/**
* Redis TTL for the counter key. Set so the counter naturally
* expires at the end of its period — no separate sweep job.
*
* Monthly: 35 days (slightly past the next reset; the new key
* takes over on day-1 of the new month anyway).
* Daily: 48 hours.
* Per-minute: 2 minutes.
*/
function getQuotaTTL(providerId) {
const cfg = getProvider(providerId);
if (!cfg) return 60;
switch (cfg.quotaType) {
case 'monthly': return 35 * 24 * 60 * 60;
case 'daily': return 2 * 24 * 60 * 60;
case 'per_minute': return 2 * 60;
default: return 24 * 60 * 60;
}
}
function buildKey(providerId, now = new Date()) {
return `quota:${providerId}:${getPeriodKey(providerId, now)}`;
}
function buildWarnKey(providerId, now = new Date()) {
return `quota_warned:${providerId}:${getPeriodKey(providerId, now)}`;
}
/**
* Read the counter without mutating it. Returns the structured
* status the admin dashboard renders + the gateway consults.
*/
async function getQuotaStatus(providerId) {
const cfg = getProvider(providerId);
if (!cfg) {
return {
provider: providerId,
allowed: false,
reason: 'unknown_provider',
used: 0, limit: 0, remaining: 0, pct: 1, period: '',
quotaType: 'unknown',
};
}
if (isDegraded()) {
// Fail open — the alternative (degrade closed) means a Redis
// hiccup takes down every provider call platform-wide.
return {
provider: providerId, name: cfg.name,
allowed: true, degraded: true,
used: 0, limit: cfg.quotaLimit, remaining: cfg.quotaLimit, pct: 0,
period: getPeriodKey(providerId), quotaType: cfg.quotaType,
};
}
const cached = await cacheGet(buildKey(providerId));
const used = (cached && typeof cached.used === 'number') ? cached.used : 0;
const limit = (cached && typeof cached.limit === 'number') ? cached.limit : cfg.quotaLimit;
const pct = limit > 0 ? used / limit : 0;
return {
provider: providerId,
name: cfg.name,
allowed: pct < THRESHOLDS.BLOCK_PCT,
used, limit,
remaining: Math.max(0, limit - used),
pct,
period: getPeriodKey(providerId),
quotaType: cfg.quotaType,
syncedAt: cached && cached.syncedAt ? cached.syncedAt : null,
};
}
/**
* Increment the counter by one. Returns the updated status.
*
* The warning at 80% fires once per period via a separate sentinel
* key so the log line doesn't repeat 200 times.
*/
async function recordCall(providerId) {
const cfg = getProvider(providerId);
if (!cfg) return { provider: providerId, allowed: false, reason: 'unknown_provider' };
if (isDegraded()) return { provider: providerId, allowed: true, degraded: true };
const key = buildKey(providerId);
const cached = await cacheGet(key);
const used = (cached && typeof cached.used === 'number') ? cached.used : 0;
const limit = (cached && typeof cached.limit === 'number') ? cached.limit : cfg.quotaLimit;
const nextUsed = used + 1;
const pct = limit > 0 ? nextUsed / limit : 0;
const payload = { used: nextUsed, limit, updatedAt: new Date().toISOString() };
if (cached && cached.syncedAt) payload.syncedAt = cached.syncedAt;
await cacheSet(key, payload, getQuotaTTL(providerId));
if (pct >= THRESHOLDS.WARN_PCT) {
const warnKey = buildWarnKey(providerId);
const already = await cacheGet(warnKey);
if (!already) {
console.warn(
`[quotaTracker] ${cfg.name} at ${(pct * 100).toFixed(0)}% quota (${nextUsed}/${limit}) for ${getPeriodKey(providerId)}`,
);
await cacheSet(warnKey, '1', getQuotaTTL(providerId));
}
}
return {
provider: providerId, name: cfg.name,
allowed: pct < THRESHOLDS.BLOCK_PCT,
used: nextUsed, limit,
remaining: Math.max(0, limit - nextUsed),
pct,
period: getPeriodKey(providerId),
quotaType: cfg.quotaType,
};
}
/**
* Decrement after a failed call. The optimistic recordCall+rollback
* pattern means a thread-unsafe race could overshoot by N during a
* burst — acceptable for our scale (single-instance Express,
* 510 concurrent calls peak). Atomic INCR would require a Lua
* script and the gain is marginal.
*/
async function rollback(providerId) {
const cfg = getProvider(providerId);
if (!cfg || isDegraded()) return;
const key = buildKey(providerId);
const cached = await cacheGet(key);
if (!cached || typeof cached.used !== 'number') return;
const nextUsed = Math.max(0, cached.used - 1);
await cacheSet(
key,
{ ...cached, used: nextUsed, updatedAt: new Date().toISOString() },
getQuotaTTL(providerId),
);
}
/**
* Sync from upstream response headers. odds-api returns
* `x-requests-used` + `x-requests-remaining`; this becomes the
* truth source for that provider's counter.
*
* Header presence varies — we look at common spellings.
*/
async function syncFromHeaders(providerId, headers) {
if (!headers || isDegraded()) return null;
const cfg = getProvider(providerId);
if (!cfg) return null;
// Normalize header lookup (axios returns lowercase, fetch keeps case).
const h = {};
for (const k of Object.keys(headers)) h[String(k).toLowerCase()] = headers[k];
const usedHdr = h['x-requests-used'] ?? h['x-quota-used'] ?? h['x-ratelimit-used'];
const remainingHdr = h['x-requests-remaining'] ?? h['x-quota-remaining'] ?? h['x-ratelimit-remaining'];
const limitHdr = h['x-quota-limit'] ?? h['x-ratelimit-limit'];
const used = Number.parseInt(usedHdr, 10);
const remaining = Number.parseInt(remainingHdr, 10);
const limitFromHdr = Number.parseInt(limitHdr, 10);
if (!Number.isFinite(used) && !Number.isFinite(remaining)) return null;
const limit = Number.isFinite(limitFromHdr)
? limitFromHdr
: (Number.isFinite(used) && Number.isFinite(remaining))
? used + remaining
: cfg.quotaLimit;
const resolvedUsed = Number.isFinite(used)
? used
: (Number.isFinite(remaining) ? Math.max(0, limit - remaining) : 0);
const payload = {
used: resolvedUsed,
limit,
syncedAt: new Date().toISOString(),
source: 'headers',
};
await cacheSet(buildKey(providerId), payload, getQuotaTTL(providerId));
return payload;
}
/**
* Status snapshot for every configured provider — admin dashboard
* tile, /api/internal/quota endpoint.
*/
async function getAllQuotaStatuses() {
const configured = getConfiguredProviders();
return Promise.all(configured.map((p) => getQuotaStatus(p.id)));
}
/**
* Frequency-scaling helper for schedulers. Returns the wait
* interval (ms) the caller SHOULD use before its next provider
* hit, or null when the quota is exhausted.
*
* The discrete steps match the Session 20 spec:
* <50% → 5 min (full speed)
* <80% → 15 min
* <95% → 30 min
* ≥95% → null (stop)
*/
function getTickInterval(pct) {
if (!Number.isFinite(pct)) return 5 * 60 * 1000;
if (pct >= THRESHOLDS.BLOCK_PCT) return null;
if (pct >= THRESHOLDS.WARN_PCT) return 30 * 60 * 1000;
if (pct >= 0.50) return 15 * 60 * 1000;
return 5 * 60 * 1000;
}
/**
* Composite helper for schedulers: should this poller wait or fire
* right now? Convenience over `getQuotaStatus + getTickInterval`.
*/
async function shouldThrottle(providerId) {
const status = await getQuotaStatus(providerId);
const interval = getTickInterval(status.pct);
return { allowed: status.allowed, interval, status };
}
module.exports = {
getPeriodKey,
buildKey,
buildWarnKey,
getQuotaStatus,
getAllQuotaStatuses,
recordCall,
rollback,
syncFromHeaders,
getTickInterval,
shouldThrottle,
// Exposed for tests that need to reach the threshold constants
// without re-importing the providers module.
THRESHOLDS,
};