busi488energy/scripts/backfill.ts
Joey Eamigh 7a1bbca339
fix: comprehensive review fixes — real price data, missing components, SQL bugs, security
- Replace $0 electricity prices with real EIA retail-sales data (IND sector)
  with demand-based hourly variation (0.8x-1.2x)
- Add sparkline component and alerts feed to dashboard home
- Add animated number transitions to hero metric cards
- Fix ticker tape price direction (green/red arrows with % change)
- Fix AI milestone annotation alignment on price charts
- Fix SQL cartesian products in getDemandByRegion and getRegionPriceHeatmap
  using CTEs for independent aggregation
- Add unique composite constraints to prevent duplicate data
- Add bearer token auth to ingestion API routes
- Add 30s fetch timeouts to EIA and FRED API clients
- Add regionCode validation in server actions
- Fix docker-compose: localhost-only port binding, correct volume path
- Fix seed script to preserve ingested time-series data
2026-02-11 13:23:21 -05:00

468 lines
16 KiB
TypeScript

/**
* Historical data backfill script.
*
* Populates 6 months of historical data from EIA and FRED into Postgres.
* Idempotent — safe to re-run; existing records are updated, not duplicated.
*
* Usage: bun run scripts/backfill.ts
*/
import 'dotenv/config';
import { PrismaPg } from '@prisma/adapter-pg';
import { PrismaClient } from '../src/generated/prisma/client.js';
import * as eia from '../src/lib/api/eia.js';
import { getFuelTypeData, getRegionData, getRetailElectricityPrices } from '../src/lib/api/eia.js';
import * as fred from '../src/lib/api/fred.js';
import type { RegionCode } from '../src/lib/schemas/electricity.js';
const adapter = new PrismaPg({ connectionString: process.env.DATABASE_URL });
const prisma = new PrismaClient({ adapter });
const ALL_REGIONS: RegionCode[] = ['PJM', 'ERCOT', 'CAISO', 'NYISO', 'ISONE', 'MISO', 'SPP'];
const SIX_MONTHS_MS = 6 * 30 * 24 * 60 * 60 * 1000;
function sixMonthsAgoIso(): string {
return new Date(Date.now() - SIX_MONTHS_MS).toISOString().slice(0, 10);
}
function todayIso(): string {
return new Date().toISOString().slice(0, 10);
}
function sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
function log(msg: string): void {
const ts = new Date().toISOString().slice(11, 19);
console.log(`[${ts}] ${msg}`);
}
// ---------------------------------------------------------------------------
// Electricity demand backfill
// ---------------------------------------------------------------------------
async function backfillElectricity(): Promise<void> {
log('=== Backfilling electricity demand + price data ===');
const gridRegions = await prisma.gridRegion.findMany({
select: { id: true, code: true },
});
const regionIdByCode = new Map(gridRegions.map(r => [r.code, r.id]));
const start = sixMonthsAgoIso();
const end = todayIso();
// Fetch monthly retail electricity prices for all regions upfront
// Key: "REGION:YYYY-MM" -> $/MWh
const retailPriceByRegionMonth = new Map<string, number>();
log(' Fetching retail electricity prices...');
try {
const startMonth = start.slice(0, 7); // YYYY-MM
const endMonth = end.slice(0, 7);
const retailPrices = await getRetailElectricityPrices({ start: startMonth, end: endMonth });
for (const rp of retailPrices) {
retailPriceByRegionMonth.set(`${rp.regionCode}:${rp.period}`, rp.priceMwh);
}
log(` Retail prices: ${retailPrices.length} records for ${retailPriceByRegionMonth.size} region-months`);
} catch (err) {
log(` ERROR fetching retail prices: ${err instanceof Error ? err.message : String(err)}`);
}
// Build a fallback: for each region, find the most recent month with data
const latestPriceByRegion = new Map<string, number>();
for (const [key, price] of retailPriceByRegionMonth) {
const region = key.split(':')[0]!;
const existing = latestPriceByRegion.get(region);
// Since keys are "REGION:YYYY-MM", the latest month lexicographically is the most recent
if (!existing || key > `${region}:${existing}`) {
latestPriceByRegion.set(region, price);
}
}
/** Look up price for a region+month, falling back to latest known price */
function getRetailPrice(region: string, month: string): number {
return retailPriceByRegionMonth.get(`${region}:${month}`) ?? latestPriceByRegion.get(region) ?? 0;
}
await sleep(200);
for (const regionCode of ALL_REGIONS) {
const regionId = regionIdByCode.get(regionCode);
if (!regionId) {
log(` SKIP ${regionCode} — no grid_region row found`);
continue;
}
log(` Fetching demand for ${regionCode}...`);
try {
const demandData = await getRegionData(regionCode, 'D', { start, end });
const validPoints = demandData.filter((p): p is typeof p & { valueMw: number } => p.valueMw !== null);
if (validPoints.length === 0) {
log(` ${regionCode}: 0 valid data points`);
continue;
}
// Check existing records to decide create vs update
const timestamps = validPoints.map(p => p.timestamp);
const existing = await prisma.electricityPrice.findMany({
where: { regionId, timestamp: { in: timestamps } },
select: { id: true, timestamp: true },
});
const existingByTime = new Map(existing.map(e => [e.timestamp.getTime(), e.id]));
// Find peak demand for demand-based price variation
const peakDemand = Math.max(...validPoints.map(p => p.valueMw));
const toCreate: Array<{
regionId: string;
priceMwh: number;
demandMw: number;
timestamp: Date;
source: string;
}> = [];
const toUpdate: Array<{ id: string; demandMw: number; priceMwh: number }> = [];
for (const point of validPoints) {
const month = point.timestamp.toISOString().slice(0, 7);
const basePrice = getRetailPrice(regionCode, month);
// Add demand-based variation: scale price between 0.8x and 1.2x based on demand
const demandRatio = peakDemand > 0 ? point.valueMw / peakDemand : 0.5;
const priceMwh = basePrice > 0 ? basePrice * (0.8 + 0.4 * demandRatio) : 0;
const existingId = existingByTime.get(point.timestamp.getTime());
if (existingId) {
toUpdate.push({ id: existingId, demandMw: point.valueMw, priceMwh });
} else {
toCreate.push({
regionId,
priceMwh,
demandMw: point.valueMw,
timestamp: point.timestamp,
source: 'EIA',
});
}
}
if (toCreate.length > 0) {
const result = await prisma.electricityPrice.createMany({ data: toCreate });
log(` ${regionCode}: ${result.count} records inserted`);
}
if (toUpdate.length > 0) {
// Batch updates in chunks of 100 to avoid transaction timeouts
const chunkSize = 100;
for (let i = 0; i < toUpdate.length; i += chunkSize) {
const chunk = toUpdate.slice(i, i + chunkSize);
await prisma.$transaction(
chunk.map(u =>
prisma.electricityPrice.update({
where: { id: u.id },
data: { demandMw: u.demandMw, priceMwh: u.priceMwh, source: 'EIA' },
}),
),
);
}
log(` ${regionCode}: ${toUpdate.length} records updated`);
}
if (toCreate.length === 0 && toUpdate.length === 0) {
log(` ${regionCode}: no changes needed`);
}
} catch (err) {
log(` ERROR ${regionCode}: ${err instanceof Error ? err.message : String(err)}`);
}
// Rate limit: 200ms between regions
await sleep(200);
}
}
// ---------------------------------------------------------------------------
// Generation mix backfill
// ---------------------------------------------------------------------------
async function backfillGeneration(): Promise<void> {
log('=== Backfilling generation mix data ===');
const gridRegions = await prisma.gridRegion.findMany({
select: { id: true, code: true },
});
const regionIdByCode = new Map(gridRegions.map(r => [r.code, r.id]));
const start = sixMonthsAgoIso();
const end = todayIso();
for (const regionCode of ALL_REGIONS) {
const regionId = regionIdByCode.get(regionCode);
if (!regionId) {
log(` SKIP ${regionCode} — no grid_region row found`);
continue;
}
log(` Fetching generation mix for ${regionCode}...`);
try {
const fuelData = await getFuelTypeData(regionCode, { start, end });
const validPoints = fuelData.filter((p): p is typeof p & { generationMw: number } => p.generationMw !== null);
if (validPoints.length === 0) {
log(` ${regionCode}: 0 valid data points`);
continue;
}
const timestamps = validPoints.map(p => p.timestamp);
const existing = await prisma.generationMix.findMany({
where: { regionId, timestamp: { in: timestamps } },
select: { id: true, timestamp: true, fuelType: true },
});
const existingKeys = new Map(existing.map(e => [`${e.fuelType}:${e.timestamp.getTime()}`, e.id]));
const toCreate: Array<{
regionId: string;
fuelType: string;
generationMw: number;
timestamp: Date;
}> = [];
const toUpdate: Array<{ id: string; generationMw: number }> = [];
for (const point of validPoints) {
const key = `${point.fuelType}:${point.timestamp.getTime()}`;
const existingId = existingKeys.get(key);
if (existingId) {
toUpdate.push({ id: existingId, generationMw: point.generationMw });
} else {
toCreate.push({
regionId,
fuelType: point.fuelType,
generationMw: point.generationMw,
timestamp: point.timestamp,
});
}
}
if (toCreate.length > 0) {
const result = await prisma.generationMix.createMany({ data: toCreate });
log(` ${regionCode}: ${result.count} generation records inserted`);
}
if (toUpdate.length > 0) {
const chunkSize = 100;
for (let i = 0; i < toUpdate.length; i += chunkSize) {
const chunk = toUpdate.slice(i, i + chunkSize);
await prisma.$transaction(
chunk.map(u =>
prisma.generationMix.update({
where: { id: u.id },
data: { generationMw: u.generationMw },
}),
),
);
}
log(` ${regionCode}: ${toUpdate.length} generation records updated`);
}
if (toCreate.length === 0 && toUpdate.length === 0) {
log(` ${regionCode}: no changes needed`);
}
} catch (err) {
log(` ERROR ${regionCode}: ${err instanceof Error ? err.message : String(err)}`);
}
await sleep(200);
}
}
// ---------------------------------------------------------------------------
// Commodity prices backfill
// ---------------------------------------------------------------------------
interface CommodityRow {
commodity: string;
price: number;
unit: string;
timestamp: Date;
source: string;
}
async function backfillCommodities(): Promise<void> {
log('=== Backfilling commodity prices ===');
const start = sixMonthsAgoIso();
const end = todayIso();
const rows: CommodityRow[] = [];
// EIA: Natural Gas (Henry Hub)
log(' Fetching EIA natural gas prices...');
try {
const gasData = await eia.getNaturalGasPrice({ start, end });
for (const p of gasData) {
if (p.price === null) continue;
rows.push({ commodity: p.commodity, price: p.price, unit: p.unit, timestamp: p.timestamp, source: p.source });
}
log(` EIA natural gas: ${gasData.length} raw records`);
} catch (err) {
log(` ERROR EIA natural gas: ${err instanceof Error ? err.message : String(err)}`);
}
await sleep(200);
// EIA: WTI Crude
log(' Fetching EIA WTI crude prices...');
try {
const oilData = await eia.getWTICrudePrice({ start, end });
for (const p of oilData) {
if (p.price === null) continue;
rows.push({ commodity: p.commodity, price: p.price, unit: p.unit, timestamp: p.timestamp, source: p.source });
}
log(` EIA WTI crude: ${oilData.length} raw records`);
} catch (err) {
log(` ERROR EIA WTI crude: ${err instanceof Error ? err.message : String(err)}`);
}
await sleep(200);
// FRED: Natural Gas (DHHNGSP)
log(' Fetching FRED natural gas prices...');
const fredGas = await fred.getNaturalGasPrice(start, end);
if (fredGas.ok) {
for (const p of fredGas.data) {
rows.push({ commodity: p.commodity, price: p.price, unit: p.unit, timestamp: p.timestamp, source: p.source });
}
log(` FRED natural gas: ${fredGas.data.length} records`);
} else {
log(` ERROR FRED natural gas: ${fredGas.error}`);
}
await sleep(200);
// FRED: WTI Crude (DCOILWTICO)
log(' Fetching FRED WTI crude prices...');
const fredOil = await fred.getWTICrudePrice(start, end);
if (fredOil.ok) {
for (const p of fredOil.data) {
rows.push({ commodity: p.commodity, price: p.price, unit: p.unit, timestamp: p.timestamp, source: p.source });
}
log(` FRED WTI crude: ${fredOil.data.length} records`);
} else {
log(` ERROR FRED WTI crude: ${fredOil.error}`);
}
await sleep(200);
// FRED: Coal (PCOALAUUSDM)
log(' Fetching FRED coal prices...');
const fredCoal = await fred.getCoalPrice(start, end);
if (fredCoal.ok) {
for (const p of fredCoal.data) {
rows.push({ commodity: p.commodity, price: p.price, unit: p.unit, timestamp: p.timestamp, source: p.source });
}
log(` FRED coal: ${fredCoal.data.length} records`);
} else {
log(` ERROR FRED coal: ${fredCoal.error}`);
}
if (rows.length === 0) {
log(' No commodity data fetched');
return;
}
// Deduplicate: for same commodity + timestamp, prefer EIA over FRED
const deduped = new Map<string, CommodityRow>();
for (const row of rows) {
const key = `${row.commodity}:${row.timestamp.getTime()}`;
const existing = deduped.get(key);
if (!existing || (existing.source === 'FRED' && row.source === 'EIA')) {
deduped.set(key, row);
}
}
const uniqueRows = [...deduped.values()];
// Upsert into database
const timestamps = uniqueRows.map(r => r.timestamp);
const commodities = [...new Set(uniqueRows.map(r => r.commodity))];
const existing = await prisma.commodityPrice.findMany({
where: { commodity: { in: commodities }, timestamp: { in: timestamps } },
select: { id: true, commodity: true, timestamp: true },
});
const existingKeys = new Map(existing.map(e => [`${e.commodity}:${e.timestamp.getTime()}`, e.id]));
const toCreate: Array<{ commodity: string; price: number; unit: string; timestamp: Date; source: string }> = [];
const toUpdate: Array<{ id: string; price: number; unit: string; source: string }> = [];
for (const row of uniqueRows) {
const key = `${row.commodity}:${row.timestamp.getTime()}`;
const existingId = existingKeys.get(key);
if (existingId) {
toUpdate.push({ id: existingId, price: row.price, unit: row.unit, source: row.source });
} else {
toCreate.push({
commodity: row.commodity,
price: row.price,
unit: row.unit,
timestamp: row.timestamp,
source: row.source,
});
}
}
if (toCreate.length > 0) {
const result = await prisma.commodityPrice.createMany({ data: toCreate });
log(` Commodities: ${result.count} records inserted`);
}
if (toUpdate.length > 0) {
const chunkSize = 100;
for (let i = 0; i < toUpdate.length; i += chunkSize) {
const chunk = toUpdate.slice(i, i + chunkSize);
await prisma.$transaction(
chunk.map(u =>
prisma.commodityPrice.update({
where: { id: u.id },
data: { price: u.price, unit: u.unit, source: u.source },
}),
),
);
}
log(` Commodities: ${toUpdate.length} records updated`);
}
if (toCreate.length === 0 && toUpdate.length === 0) {
log(' Commodities: no changes needed');
}
}
// ---------------------------------------------------------------------------
// Main
// ---------------------------------------------------------------------------
async function main(): Promise<void> {
log('Starting historical backfill (6 months)...');
log(`Date range: ${sixMonthsAgoIso()} to ${todayIso()}`);
log('');
await backfillElectricity();
log('');
await backfillGeneration();
log('');
await backfillCommodities();
log('');
log('Backfill complete.');
}
main()
.catch(err => {
console.error('Backfill failed:', err);
process.exit(1);
})
.finally(() => {
void prisma.$disconnect();
});