/** * Historical data backfill script. * * Populates 6 months of historical data from EIA and FRED into Postgres. * Idempotent — safe to re-run; existing records are updated, not duplicated. * * Usage: bun run scripts/backfill.ts */ import 'dotenv/config'; import { PrismaPg } from '@prisma/adapter-pg'; import { PrismaClient } from '../src/generated/prisma/client.js'; import * as eia from '../src/lib/api/eia.js'; import { getFuelTypeData, getRegionData } from '../src/lib/api/eia.js'; import * as fred from '../src/lib/api/fred.js'; import type { RegionCode } from '../src/lib/schemas/electricity.js'; const adapter = new PrismaPg({ connectionString: process.env.DATABASE_URL }); const prisma = new PrismaClient({ adapter }); const ALL_REGIONS: RegionCode[] = ['PJM', 'ERCOT', 'CAISO', 'NYISO', 'ISONE', 'MISO', 'SPP']; const SIX_MONTHS_MS = 6 * 30 * 24 * 60 * 60 * 1000; function sixMonthsAgoIso(): string { return new Date(Date.now() - SIX_MONTHS_MS).toISOString().slice(0, 10); } function todayIso(): string { return new Date().toISOString().slice(0, 10); } function sleep(ms: number): Promise { return new Promise(resolve => setTimeout(resolve, ms)); } function log(msg: string): void { const ts = new Date().toISOString().slice(11, 19); console.log(`[${ts}] ${msg}`); } // --------------------------------------------------------------------------- // Electricity demand backfill // --------------------------------------------------------------------------- async function backfillElectricity(): Promise { log('=== Backfilling electricity demand data ==='); const gridRegions = await prisma.gridRegion.findMany({ select: { id: true, code: true }, }); const regionIdByCode = new Map(gridRegions.map(r => [r.code, r.id])); const start = sixMonthsAgoIso(); const end = todayIso(); for (const regionCode of ALL_REGIONS) { const regionId = regionIdByCode.get(regionCode); if (!regionId) { log(` SKIP ${regionCode} — no grid_region row found`); continue; } log(` Fetching demand for ${regionCode}...`); try { const demandData = await getRegionData(regionCode, 'D', { start, end }); const validPoints = demandData.filter((p): p is typeof p & { valueMw: number } => p.valueMw !== null); if (validPoints.length === 0) { log(` ${regionCode}: 0 valid data points`); continue; } // Check existing records to decide create vs update const timestamps = validPoints.map(p => p.timestamp); const existing = await prisma.electricityPrice.findMany({ where: { regionId, timestamp: { in: timestamps } }, select: { id: true, timestamp: true }, }); const existingByTime = new Map(existing.map(e => [e.timestamp.getTime(), e.id])); const toCreate: Array<{ regionId: string; priceMwh: number; demandMw: number; timestamp: Date; source: string; }> = []; const toUpdate: Array<{ id: string; demandMw: number }> = []; for (const point of validPoints) { const existingId = existingByTime.get(point.timestamp.getTime()); if (existingId) { toUpdate.push({ id: existingId, demandMw: point.valueMw }); } else { toCreate.push({ regionId, priceMwh: 0, // TODO: No real-time wholesale price available from EIA demandMw: point.valueMw, timestamp: point.timestamp, source: 'EIA', }); } } if (toCreate.length > 0) { const result = await prisma.electricityPrice.createMany({ data: toCreate }); log(` ${regionCode}: ${result.count} records inserted`); } if (toUpdate.length > 0) { // Batch updates in chunks of 100 to avoid transaction timeouts const chunkSize = 100; for (let i = 0; i < toUpdate.length; i += chunkSize) { const chunk = toUpdate.slice(i, i + chunkSize); await prisma.$transaction( chunk.map(u => prisma.electricityPrice.update({ where: { id: u.id }, data: { demandMw: u.demandMw, source: 'EIA' }, }), ), ); } log(` ${regionCode}: ${toUpdate.length} records updated`); } if (toCreate.length === 0 && toUpdate.length === 0) { log(` ${regionCode}: no changes needed`); } } catch (err) { log(` ERROR ${regionCode}: ${err instanceof Error ? err.message : String(err)}`); } // Rate limit: 200ms between regions await sleep(200); } } // --------------------------------------------------------------------------- // Generation mix backfill // --------------------------------------------------------------------------- async function backfillGeneration(): Promise { log('=== Backfilling generation mix data ==='); const gridRegions = await prisma.gridRegion.findMany({ select: { id: true, code: true }, }); const regionIdByCode = new Map(gridRegions.map(r => [r.code, r.id])); const start = sixMonthsAgoIso(); const end = todayIso(); for (const regionCode of ALL_REGIONS) { const regionId = regionIdByCode.get(regionCode); if (!regionId) { log(` SKIP ${regionCode} — no grid_region row found`); continue; } log(` Fetching generation mix for ${regionCode}...`); try { const fuelData = await getFuelTypeData(regionCode, { start, end }); const validPoints = fuelData.filter((p): p is typeof p & { generationMw: number } => p.generationMw !== null); if (validPoints.length === 0) { log(` ${regionCode}: 0 valid data points`); continue; } const timestamps = validPoints.map(p => p.timestamp); const existing = await prisma.generationMix.findMany({ where: { regionId, timestamp: { in: timestamps } }, select: { id: true, timestamp: true, fuelType: true }, }); const existingKeys = new Map(existing.map(e => [`${e.fuelType}:${e.timestamp.getTime()}`, e.id])); const toCreate: Array<{ regionId: string; fuelType: string; generationMw: number; timestamp: Date; }> = []; const toUpdate: Array<{ id: string; generationMw: number }> = []; for (const point of validPoints) { const key = `${point.fuelType}:${point.timestamp.getTime()}`; const existingId = existingKeys.get(key); if (existingId) { toUpdate.push({ id: existingId, generationMw: point.generationMw }); } else { toCreate.push({ regionId, fuelType: point.fuelType, generationMw: point.generationMw, timestamp: point.timestamp, }); } } if (toCreate.length > 0) { const result = await prisma.generationMix.createMany({ data: toCreate }); log(` ${regionCode}: ${result.count} generation records inserted`); } if (toUpdate.length > 0) { const chunkSize = 100; for (let i = 0; i < toUpdate.length; i += chunkSize) { const chunk = toUpdate.slice(i, i + chunkSize); await prisma.$transaction( chunk.map(u => prisma.generationMix.update({ where: { id: u.id }, data: { generationMw: u.generationMw }, }), ), ); } log(` ${regionCode}: ${toUpdate.length} generation records updated`); } if (toCreate.length === 0 && toUpdate.length === 0) { log(` ${regionCode}: no changes needed`); } } catch (err) { log(` ERROR ${regionCode}: ${err instanceof Error ? err.message : String(err)}`); } await sleep(200); } } // --------------------------------------------------------------------------- // Commodity prices backfill // --------------------------------------------------------------------------- interface CommodityRow { commodity: string; price: number; unit: string; timestamp: Date; source: string; } async function backfillCommodities(): Promise { log('=== Backfilling commodity prices ==='); const start = sixMonthsAgoIso(); const end = todayIso(); const rows: CommodityRow[] = []; // EIA: Natural Gas (Henry Hub) log(' Fetching EIA natural gas prices...'); try { const gasData = await eia.getNaturalGasPrice({ start, end }); for (const p of gasData) { if (p.price === null) continue; rows.push({ commodity: p.commodity, price: p.price, unit: p.unit, timestamp: p.timestamp, source: p.source }); } log(` EIA natural gas: ${gasData.length} raw records`); } catch (err) { log(` ERROR EIA natural gas: ${err instanceof Error ? err.message : String(err)}`); } await sleep(200); // EIA: WTI Crude log(' Fetching EIA WTI crude prices...'); try { const oilData = await eia.getWTICrudePrice({ start, end }); for (const p of oilData) { if (p.price === null) continue; rows.push({ commodity: p.commodity, price: p.price, unit: p.unit, timestamp: p.timestamp, source: p.source }); } log(` EIA WTI crude: ${oilData.length} raw records`); } catch (err) { log(` ERROR EIA WTI crude: ${err instanceof Error ? err.message : String(err)}`); } await sleep(200); // FRED: Natural Gas (DHHNGSP) log(' Fetching FRED natural gas prices...'); const fredGas = await fred.getNaturalGasPrice(start, end); if (fredGas.ok) { for (const p of fredGas.data) { rows.push({ commodity: p.commodity, price: p.price, unit: p.unit, timestamp: p.timestamp, source: p.source }); } log(` FRED natural gas: ${fredGas.data.length} records`); } else { log(` ERROR FRED natural gas: ${fredGas.error}`); } await sleep(200); // FRED: WTI Crude (DCOILWTICO) log(' Fetching FRED WTI crude prices...'); const fredOil = await fred.getWTICrudePrice(start, end); if (fredOil.ok) { for (const p of fredOil.data) { rows.push({ commodity: p.commodity, price: p.price, unit: p.unit, timestamp: p.timestamp, source: p.source }); } log(` FRED WTI crude: ${fredOil.data.length} records`); } else { log(` ERROR FRED WTI crude: ${fredOil.error}`); } await sleep(200); // FRED: Coal (PCOALAUUSDM) log(' Fetching FRED coal prices...'); const fredCoal = await fred.getCoalPrice(start, end); if (fredCoal.ok) { for (const p of fredCoal.data) { rows.push({ commodity: p.commodity, price: p.price, unit: p.unit, timestamp: p.timestamp, source: p.source }); } log(` FRED coal: ${fredCoal.data.length} records`); } else { log(` ERROR FRED coal: ${fredCoal.error}`); } if (rows.length === 0) { log(' No commodity data fetched'); return; } // Deduplicate: for same commodity + timestamp, prefer EIA over FRED const deduped = new Map(); for (const row of rows) { const key = `${row.commodity}:${row.timestamp.getTime()}`; const existing = deduped.get(key); if (!existing || (existing.source === 'FRED' && row.source === 'EIA')) { deduped.set(key, row); } } const uniqueRows = [...deduped.values()]; // Upsert into database const timestamps = uniqueRows.map(r => r.timestamp); const commodities = [...new Set(uniqueRows.map(r => r.commodity))]; const existing = await prisma.commodityPrice.findMany({ where: { commodity: { in: commodities }, timestamp: { in: timestamps } }, select: { id: true, commodity: true, timestamp: true }, }); const existingKeys = new Map(existing.map(e => [`${e.commodity}:${e.timestamp.getTime()}`, e.id])); const toCreate: Array<{ commodity: string; price: number; unit: string; timestamp: Date; source: string }> = []; const toUpdate: Array<{ id: string; price: number; unit: string; source: string }> = []; for (const row of uniqueRows) { const key = `${row.commodity}:${row.timestamp.getTime()}`; const existingId = existingKeys.get(key); if (existingId) { toUpdate.push({ id: existingId, price: row.price, unit: row.unit, source: row.source }); } else { toCreate.push({ commodity: row.commodity, price: row.price, unit: row.unit, timestamp: row.timestamp, source: row.source, }); } } if (toCreate.length > 0) { const result = await prisma.commodityPrice.createMany({ data: toCreate }); log(` Commodities: ${result.count} records inserted`); } if (toUpdate.length > 0) { const chunkSize = 100; for (let i = 0; i < toUpdate.length; i += chunkSize) { const chunk = toUpdate.slice(i, i + chunkSize); await prisma.$transaction( chunk.map(u => prisma.commodityPrice.update({ where: { id: u.id }, data: { price: u.price, unit: u.unit, source: u.source }, }), ), ); } log(` Commodities: ${toUpdate.length} records updated`); } if (toCreate.length === 0 && toUpdate.length === 0) { log(' Commodities: no changes needed'); } } // --------------------------------------------------------------------------- // Main // --------------------------------------------------------------------------- async function main(): Promise { log('Starting historical backfill (6 months)...'); log(`Date range: ${sixMonthsAgoIso()} to ${todayIso()}`); log(''); await backfillElectricity(); log(''); await backfillGeneration(); log(''); await backfillCommodities(); log(''); log('Backfill complete.'); } main() .catch(err => { console.error('Backfill failed:', err); process.exit(1); }) .finally(() => { void prisma.$disconnect(); });