- Add 7 new grid regions (BPA, DUKE, SOCO, TVA, FPC, WAPA, NWMT) to cover entire continental US - Expand datacenters from 108 to 292 facilities across 39 operators - Add EIA power plant pipeline: download script, 3,546 plants >= 50 MW with diamond map markers - Rewrite backfill script for 10-year data (2015-07-01) with quarterly/monthly chunking, 3-region parallelism, resumability - Add materialized views (daily/weekly) with server-side granularity selection for chart performance - Fix map UX: z-index tooltips, disable POI clicks, move legend via MapControl
715 lines
23 KiB
TypeScript
715 lines
23 KiB
TypeScript
/**
|
|
* Historical data backfill script (10-year).
|
|
*
|
|
* Populates ~10 years of historical data (from 2015-07-01) from EIA and FRED
|
|
* into Postgres. Uses time-chunked requests to stay under EIA's 5,000-row
|
|
* pagination limit, with concurrent region fetching and resumability.
|
|
*
|
|
* Idempotent — safe to re-run; uses ON CONFLICT upserts.
|
|
*
|
|
* Usage: bun run scripts/backfill.ts
|
|
* bun run scripts/backfill.ts --skip-demand --skip-generation
|
|
* bun run scripts/backfill.ts --only-commodities
|
|
*/
|
|
|
|
import 'dotenv/config';
|
|
|
|
import { PrismaPg } from '@prisma/adapter-pg';
|
|
import { PrismaClient } from '../src/generated/prisma/client.js';
|
|
|
|
import * as eia from '../src/lib/api/eia.js';
|
|
import { getRetailElectricityPrices } from '../src/lib/api/eia.js';
|
|
import * as fred from '../src/lib/api/fred.js';
|
|
import { type RegionCode } from '../src/lib/schemas/electricity.js';
|
|
|
|
const adapter = new PrismaPg({ connectionString: process.env.DATABASE_URL });
|
|
const prisma = new PrismaClient({ adapter });
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Configuration
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/** EIA RTO hourly data begins around 2015-07 for most ISOs */
|
|
const BACKFILL_START = '2015-07-01';
|
|
|
|
const ALL_REGIONS: RegionCode[] = [
|
|
'PJM',
|
|
'ERCOT',
|
|
'CAISO',
|
|
'NYISO',
|
|
'ISONE',
|
|
'MISO',
|
|
'SPP',
|
|
'BPA',
|
|
'DUKE',
|
|
'SOCO',
|
|
'TVA',
|
|
'FPC',
|
|
'WAPA',
|
|
'NWMT',
|
|
];
|
|
|
|
/** Number of regions to fetch concurrently */
|
|
const CONCURRENCY = 3;
|
|
|
|
/** Minimum delay between sequential API requests (ms) */
|
|
const REQUEST_DELAY_MS = 200;
|
|
|
|
/** DB upsert batch size */
|
|
const UPSERT_BATCH_SIZE = 2000;
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// CLI flags
|
|
// ---------------------------------------------------------------------------
|
|
|
|
const args = new Set(process.argv.slice(2));
|
|
const skipDemand = args.has('--skip-demand');
|
|
const skipGeneration = args.has('--skip-generation');
|
|
const skipCommodities = args.has('--skip-commodities');
|
|
const onlyCommodities = args.has('--only-commodities');
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Helpers
|
|
// ---------------------------------------------------------------------------
|
|
|
|
function sleep(ms: number): Promise<void> {
|
|
return new Promise(resolve => setTimeout(resolve, ms));
|
|
}
|
|
|
|
function log(msg: string): void {
|
|
const ts = new Date().toISOString().slice(11, 19);
|
|
console.log(`[${ts}] ${msg}`);
|
|
}
|
|
|
|
function todayIso(): string {
|
|
return new Date().toISOString().slice(0, 10);
|
|
}
|
|
|
|
/** Generate quarterly date ranges: [start, end] pairs as YYYY-MM-DD strings */
|
|
function generateQuarterChunks(startDate: string, endDate: string): Array<[string, string]> {
|
|
const chunks: Array<[string, string]> = [];
|
|
const start = new Date(`${startDate}T00:00:00Z`);
|
|
const end = new Date(`${endDate}T00:00:00Z`);
|
|
|
|
const cursor = new Date(start);
|
|
while (cursor < end) {
|
|
const chunkStart = cursor.toISOString().slice(0, 10);
|
|
// Advance 3 months
|
|
cursor.setUTCMonth(cursor.getUTCMonth() + 3);
|
|
const chunkEnd = cursor < end ? cursor.toISOString().slice(0, 10) : endDate;
|
|
chunks.push([chunkStart, chunkEnd]);
|
|
}
|
|
|
|
return chunks;
|
|
}
|
|
|
|
/** Generate monthly date ranges: [start, end] pairs as YYYY-MM-DD strings */
|
|
function generateMonthChunks(startDate: string, endDate: string): Array<[string, string]> {
|
|
const chunks: Array<[string, string]> = [];
|
|
const start = new Date(`${startDate}T00:00:00Z`);
|
|
const end = new Date(`${endDate}T00:00:00Z`);
|
|
|
|
const cursor = new Date(start);
|
|
while (cursor < end) {
|
|
const chunkStart = cursor.toISOString().slice(0, 10);
|
|
cursor.setUTCMonth(cursor.getUTCMonth() + 1);
|
|
const chunkEnd = cursor < end ? cursor.toISOString().slice(0, 10) : endDate;
|
|
chunks.push([chunkStart, chunkEnd]);
|
|
}
|
|
|
|
return chunks;
|
|
}
|
|
|
|
/** Format a quarter label like "Q3 2015" */
|
|
function quarterLabel(dateStr: string): string {
|
|
const d = new Date(`${dateStr}T00:00:00Z`);
|
|
const q = Math.floor(d.getUTCMonth() / 3) + 1;
|
|
return `Q${q} ${d.getUTCFullYear()}`;
|
|
}
|
|
|
|
/** Format a month label like "Jul 2015" */
|
|
function monthLabel(dateStr: string): string {
|
|
const d = new Date(`${dateStr}T00:00:00Z`);
|
|
return d.toLocaleString('en-US', { month: 'short', year: 'numeric', timeZone: 'UTC' });
|
|
}
|
|
|
|
/** Run async tasks with limited concurrency */
|
|
async function runWithConcurrency<T>(tasks: Array<() => Promise<T>>, limit: number): Promise<T[]> {
|
|
const results: T[] = [];
|
|
let index = 0;
|
|
|
|
async function worker(): Promise<void> {
|
|
while (index < tasks.length) {
|
|
const currentIndex = index++;
|
|
const task = tasks[currentIndex]!;
|
|
results[currentIndex] = await task();
|
|
}
|
|
}
|
|
|
|
const workers = Array.from({ length: Math.min(limit, tasks.length) }, () => worker());
|
|
await Promise.all(workers);
|
|
return results;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Progress tracker — stores completed region+chunk combos in memory
|
|
// ---------------------------------------------------------------------------
|
|
|
|
const completedChunks = new Set<string>();
|
|
|
|
function chunkKey(phase: string, region: string, chunkStart: string): string {
|
|
return `${phase}:${region}:${chunkStart}`;
|
|
}
|
|
|
|
function isChunkDone(phase: string, region: string, chunkStart: string): boolean {
|
|
return completedChunks.has(chunkKey(phase, region, chunkStart));
|
|
}
|
|
|
|
function markChunkDone(phase: string, region: string, chunkStart: string): void {
|
|
completedChunks.add(chunkKey(phase, region, chunkStart));
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Stats
|
|
// ---------------------------------------------------------------------------
|
|
|
|
interface BackfillStats {
|
|
demandInserted: number;
|
|
demandUpdated: number;
|
|
demandErrors: number;
|
|
genInserted: number;
|
|
genUpdated: number;
|
|
genErrors: number;
|
|
commodityInserted: number;
|
|
commodityUpdated: number;
|
|
}
|
|
|
|
const stats: BackfillStats = {
|
|
demandInserted: 0,
|
|
demandUpdated: 0,
|
|
demandErrors: 0,
|
|
genInserted: 0,
|
|
genUpdated: 0,
|
|
genErrors: 0,
|
|
commodityInserted: 0,
|
|
commodityUpdated: 0,
|
|
};
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Electricity demand backfill — chunked by quarter
|
|
// ---------------------------------------------------------------------------
|
|
|
|
async function backfillDemandForRegion(
|
|
regionCode: RegionCode,
|
|
regionId: string,
|
|
chunks: Array<[string, string]>,
|
|
retailPrices: Map<string, number>,
|
|
latestPriceByRegion: Map<string, number>,
|
|
): Promise<void> {
|
|
for (let i = 0; i < chunks.length; i++) {
|
|
const [start, end] = chunks[i]!;
|
|
const label = `[DEMAND] ${regionCode}: ${quarterLabel(start)} (${i + 1}/${chunks.length})`;
|
|
|
|
if (isChunkDone('demand', regionCode, start)) {
|
|
continue;
|
|
}
|
|
|
|
try {
|
|
log(` ${label} — fetching...`);
|
|
const demandData = await eia.getRegionData(regionCode, 'D', { start, end });
|
|
const validPoints = demandData.filter((p): p is typeof p & { valueMw: number } => p.valueMw !== null);
|
|
|
|
if (validPoints.length === 0) {
|
|
log(` ${label} — 0 data points, skipping`);
|
|
markChunkDone('demand', regionCode, start);
|
|
await sleep(REQUEST_DELAY_MS);
|
|
continue;
|
|
}
|
|
|
|
// Compute peak demand for price variation within this chunk
|
|
const peakDemand = Math.max(...validPoints.map(p => p.valueMw));
|
|
|
|
// Build upsert rows
|
|
const rows = validPoints.map(point => {
|
|
const month = point.timestamp.toISOString().slice(0, 7);
|
|
const basePrice = retailPrices.get(`${regionCode}:${month}`) ?? latestPriceByRegion.get(regionCode) ?? 0;
|
|
const demandRatio = peakDemand > 0 ? point.valueMw / peakDemand : 0.5;
|
|
const priceMwh = basePrice > 0 ? basePrice * (0.8 + 0.4 * demandRatio) : 0;
|
|
|
|
return {
|
|
regionId,
|
|
priceMwh,
|
|
demandMw: point.valueMw,
|
|
timestamp: point.timestamp,
|
|
};
|
|
});
|
|
|
|
// Batch upsert using raw SQL for speed
|
|
let inserted = 0;
|
|
let updated = 0;
|
|
for (let j = 0; j < rows.length; j += UPSERT_BATCH_SIZE) {
|
|
const batch = rows.slice(j, j + UPSERT_BATCH_SIZE);
|
|
const result = await upsertDemandBatch(batch);
|
|
inserted += result.inserted;
|
|
updated += result.updated;
|
|
}
|
|
|
|
stats.demandInserted += inserted;
|
|
stats.demandUpdated += updated;
|
|
log(` ${label} — ${inserted} inserted, ${updated} updated (${validPoints.length} points)`);
|
|
markChunkDone('demand', regionCode, start);
|
|
} catch (err) {
|
|
stats.demandErrors++;
|
|
log(` ${label} — ERROR: ${err instanceof Error ? err.message : String(err)}`);
|
|
}
|
|
|
|
await sleep(REQUEST_DELAY_MS);
|
|
}
|
|
}
|
|
|
|
interface UpsertResult {
|
|
inserted: number;
|
|
updated: number;
|
|
}
|
|
|
|
async function upsertDemandBatch(
|
|
rows: Array<{ regionId: string; priceMwh: number; demandMw: number; timestamp: Date }>,
|
|
): Promise<UpsertResult> {
|
|
if (rows.length === 0) return { inserted: 0, updated: 0 };
|
|
|
|
// Build VALUES clause with parameterized placeholders
|
|
const values: unknown[] = [];
|
|
const placeholders: string[] = [];
|
|
|
|
for (let i = 0; i < rows.length; i++) {
|
|
const row = rows[i]!;
|
|
const offset = i * 4;
|
|
placeholders.push(
|
|
`(gen_random_uuid(), $${offset + 1}::uuid, $${offset + 2}, $${offset + 3}, $${offset + 4}::timestamptz, 'EIA')`,
|
|
);
|
|
values.push(row.regionId, row.priceMwh, row.demandMw, row.timestamp);
|
|
}
|
|
|
|
const sql = `
|
|
WITH upserted AS (
|
|
INSERT INTO electricity_prices (id, region_id, price_mwh, demand_mw, timestamp, source)
|
|
VALUES ${placeholders.join(',\n')}
|
|
ON CONFLICT (region_id, timestamp) DO UPDATE SET
|
|
price_mwh = EXCLUDED.price_mwh,
|
|
demand_mw = EXCLUDED.demand_mw,
|
|
source = EXCLUDED.source
|
|
RETURNING (xmax = 0) AS is_insert
|
|
)
|
|
SELECT
|
|
COUNT(*) FILTER (WHERE is_insert) AS inserted,
|
|
COUNT(*) FILTER (WHERE NOT is_insert) AS updated
|
|
FROM upserted
|
|
`;
|
|
|
|
const result = await prisma.$queryRawUnsafe<Array<{ inserted: bigint; updated: bigint }>>(sql, ...values);
|
|
const row = result[0]!;
|
|
return { inserted: Number(row.inserted), updated: Number(row.updated) };
|
|
}
|
|
|
|
async function backfillElectricity(): Promise<void> {
|
|
log('=== Backfilling electricity demand + price data (10-year) ===');
|
|
|
|
const gridRegions = await prisma.gridRegion.findMany({
|
|
select: { id: true, code: true },
|
|
});
|
|
const regionIdByCode = new Map(gridRegions.map(r => [r.code, r.id]));
|
|
|
|
const end = todayIso();
|
|
const chunks = generateQuarterChunks(BACKFILL_START, end);
|
|
log(` ${chunks.length} quarterly chunks from ${BACKFILL_START} to ${end}`);
|
|
|
|
// Fetch retail prices upfront (one call covers all months + all states)
|
|
const retailPriceByRegionMonth = new Map<string, number>();
|
|
log(' Fetching retail electricity prices...');
|
|
try {
|
|
const startMonth = BACKFILL_START.slice(0, 7);
|
|
const endMonth = end.slice(0, 7);
|
|
const retailPrices = await getRetailElectricityPrices({ start: startMonth, end: endMonth });
|
|
for (const rp of retailPrices) {
|
|
retailPriceByRegionMonth.set(`${rp.regionCode}:${rp.period}`, rp.priceMwh);
|
|
}
|
|
log(` Retail prices: ${retailPrices.length} records for ${retailPriceByRegionMonth.size} region-months`);
|
|
} catch (err) {
|
|
log(` ERROR fetching retail prices: ${err instanceof Error ? err.message : String(err)}`);
|
|
}
|
|
|
|
// Build fallback: latest known price per region
|
|
const latestPriceByRegion = new Map<string, number>();
|
|
for (const [key, price] of retailPriceByRegionMonth) {
|
|
const region = key.split(':')[0]!;
|
|
const existing = latestPriceByRegion.get(region);
|
|
if (!existing || key > `${region}:${existing}`) {
|
|
latestPriceByRegion.set(region, price);
|
|
}
|
|
}
|
|
|
|
await sleep(REQUEST_DELAY_MS);
|
|
|
|
// Build tasks for each region
|
|
const regionTasks = ALL_REGIONS.map(regionCode => {
|
|
return async () => {
|
|
const regionId = regionIdByCode.get(regionCode);
|
|
if (!regionId) {
|
|
log(` SKIP ${regionCode} — no grid_region row found`);
|
|
return;
|
|
}
|
|
await backfillDemandForRegion(regionCode, regionId, chunks, retailPriceByRegionMonth, latestPriceByRegion);
|
|
};
|
|
});
|
|
|
|
await runWithConcurrency(regionTasks, CONCURRENCY);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Generation mix backfill — chunked by month
|
|
// ---------------------------------------------------------------------------
|
|
|
|
async function backfillGenerationForRegion(
|
|
regionCode: RegionCode,
|
|
regionId: string,
|
|
chunks: Array<[string, string]>,
|
|
): Promise<void> {
|
|
for (let i = 0; i < chunks.length; i++) {
|
|
const [start, end] = chunks[i]!;
|
|
const label = `[GEN] ${regionCode}: ${monthLabel(start)} (${i + 1}/${chunks.length})`;
|
|
|
|
if (isChunkDone('gen', regionCode, start)) {
|
|
continue;
|
|
}
|
|
|
|
try {
|
|
log(` ${label} — fetching...`);
|
|
const fuelData = await eia.getFuelTypeData(regionCode, { start, end });
|
|
const validPoints = fuelData.filter((p): p is typeof p & { generationMw: number } => p.generationMw !== null);
|
|
|
|
if (validPoints.length === 0) {
|
|
log(` ${label} — 0 data points, skipping`);
|
|
markChunkDone('gen', regionCode, start);
|
|
await sleep(REQUEST_DELAY_MS);
|
|
continue;
|
|
}
|
|
|
|
// Build upsert rows
|
|
const rows = validPoints.map(point => ({
|
|
regionId,
|
|
fuelType: point.fuelType,
|
|
generationMw: point.generationMw,
|
|
timestamp: point.timestamp,
|
|
}));
|
|
|
|
let inserted = 0;
|
|
let updated = 0;
|
|
for (let j = 0; j < rows.length; j += UPSERT_BATCH_SIZE) {
|
|
const batch = rows.slice(j, j + UPSERT_BATCH_SIZE);
|
|
const result = await upsertGenerationBatch(batch);
|
|
inserted += result.inserted;
|
|
updated += result.updated;
|
|
}
|
|
|
|
stats.genInserted += inserted;
|
|
stats.genUpdated += updated;
|
|
log(` ${label} — ${inserted} inserted, ${updated} updated (${validPoints.length} points)`);
|
|
markChunkDone('gen', regionCode, start);
|
|
} catch (err) {
|
|
stats.genErrors++;
|
|
log(` ${label} — ERROR: ${err instanceof Error ? err.message : String(err)}`);
|
|
}
|
|
|
|
await sleep(REQUEST_DELAY_MS);
|
|
}
|
|
}
|
|
|
|
async function upsertGenerationBatch(
|
|
rows: Array<{ regionId: string; fuelType: string; generationMw: number; timestamp: Date }>,
|
|
): Promise<UpsertResult> {
|
|
if (rows.length === 0) return { inserted: 0, updated: 0 };
|
|
|
|
const values: unknown[] = [];
|
|
const placeholders: string[] = [];
|
|
|
|
for (let i = 0; i < rows.length; i++) {
|
|
const row = rows[i]!;
|
|
const offset = i * 4;
|
|
placeholders.push(
|
|
`(gen_random_uuid(), $${offset + 1}::uuid, $${offset + 2}, $${offset + 3}, $${offset + 4}::timestamptz)`,
|
|
);
|
|
values.push(row.regionId, row.fuelType, row.generationMw, row.timestamp);
|
|
}
|
|
|
|
const sql = `
|
|
WITH upserted AS (
|
|
INSERT INTO generation_mix (id, region_id, fuel_type, generation_mw, timestamp)
|
|
VALUES ${placeholders.join(',\n')}
|
|
ON CONFLICT (region_id, fuel_type, timestamp) DO UPDATE SET
|
|
generation_mw = EXCLUDED.generation_mw
|
|
RETURNING (xmax = 0) AS is_insert
|
|
)
|
|
SELECT
|
|
COUNT(*) FILTER (WHERE is_insert) AS inserted,
|
|
COUNT(*) FILTER (WHERE NOT is_insert) AS updated
|
|
FROM upserted
|
|
`;
|
|
|
|
const result = await prisma.$queryRawUnsafe<Array<{ inserted: bigint; updated: bigint }>>(sql, ...values);
|
|
const row = result[0]!;
|
|
return { inserted: Number(row.inserted), updated: Number(row.updated) };
|
|
}
|
|
|
|
async function backfillGeneration(): Promise<void> {
|
|
log('=== Backfilling generation mix data (10-year) ===');
|
|
|
|
const gridRegions = await prisma.gridRegion.findMany({
|
|
select: { id: true, code: true },
|
|
});
|
|
const regionIdByCode = new Map(gridRegions.map(r => [r.code, r.id]));
|
|
|
|
const end = todayIso();
|
|
const chunks = generateMonthChunks(BACKFILL_START, end);
|
|
log(` ${chunks.length} monthly chunks from ${BACKFILL_START} to ${end}`);
|
|
|
|
const regionTasks = ALL_REGIONS.map(regionCode => {
|
|
return async () => {
|
|
const regionId = regionIdByCode.get(regionCode);
|
|
if (!regionId) {
|
|
log(` SKIP ${regionCode} — no grid_region row found`);
|
|
return;
|
|
}
|
|
await backfillGenerationForRegion(regionCode, regionId, chunks);
|
|
};
|
|
});
|
|
|
|
await runWithConcurrency(regionTasks, CONCURRENCY);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Commodity prices backfill
|
|
// ---------------------------------------------------------------------------
|
|
|
|
interface CommodityRow {
|
|
commodity: string;
|
|
price: number;
|
|
unit: string;
|
|
timestamp: Date;
|
|
source: string;
|
|
}
|
|
|
|
async function backfillCommodities(): Promise<void> {
|
|
log('=== Backfilling commodity prices (10-year) ===');
|
|
|
|
const start = BACKFILL_START;
|
|
const end = todayIso();
|
|
const rows: CommodityRow[] = [];
|
|
|
|
// EIA: Natural Gas (Henry Hub)
|
|
log(' Fetching EIA natural gas prices...');
|
|
try {
|
|
const gasData = await eia.getNaturalGasPrice({ start, end });
|
|
for (const p of gasData) {
|
|
if (p.price === null) continue;
|
|
rows.push({ commodity: p.commodity, price: p.price, unit: p.unit, timestamp: p.timestamp, source: p.source });
|
|
}
|
|
log(` EIA natural gas: ${gasData.length} raw records`);
|
|
} catch (err) {
|
|
log(` ERROR EIA natural gas: ${err instanceof Error ? err.message : String(err)}`);
|
|
}
|
|
|
|
await sleep(REQUEST_DELAY_MS);
|
|
|
|
// EIA: WTI Crude
|
|
log(' Fetching EIA WTI crude prices...');
|
|
try {
|
|
const oilData = await eia.getWTICrudePrice({ start, end });
|
|
for (const p of oilData) {
|
|
if (p.price === null) continue;
|
|
rows.push({ commodity: p.commodity, price: p.price, unit: p.unit, timestamp: p.timestamp, source: p.source });
|
|
}
|
|
log(` EIA WTI crude: ${oilData.length} raw records`);
|
|
} catch (err) {
|
|
log(` ERROR EIA WTI crude: ${err instanceof Error ? err.message : String(err)}`);
|
|
}
|
|
|
|
await sleep(REQUEST_DELAY_MS);
|
|
|
|
// FRED: Natural Gas (DHHNGSP)
|
|
log(' Fetching FRED natural gas prices...');
|
|
const fredGas = await fred.getNaturalGasPrice(start, end);
|
|
if (fredGas.ok) {
|
|
for (const p of fredGas.data) {
|
|
rows.push({ commodity: p.commodity, price: p.price, unit: p.unit, timestamp: p.timestamp, source: p.source });
|
|
}
|
|
log(` FRED natural gas: ${fredGas.data.length} records`);
|
|
} else {
|
|
log(` ERROR FRED natural gas: ${fredGas.error}`);
|
|
}
|
|
|
|
await sleep(REQUEST_DELAY_MS);
|
|
|
|
// FRED: WTI Crude (DCOILWTICO)
|
|
log(' Fetching FRED WTI crude prices...');
|
|
const fredOil = await fred.getWTICrudePrice(start, end);
|
|
if (fredOil.ok) {
|
|
for (const p of fredOil.data) {
|
|
rows.push({ commodity: p.commodity, price: p.price, unit: p.unit, timestamp: p.timestamp, source: p.source });
|
|
}
|
|
log(` FRED WTI crude: ${fredOil.data.length} records`);
|
|
} else {
|
|
log(` ERROR FRED WTI crude: ${fredOil.error}`);
|
|
}
|
|
|
|
await sleep(REQUEST_DELAY_MS);
|
|
|
|
// FRED: Coal (PCOALAUUSDM)
|
|
log(' Fetching FRED coal prices...');
|
|
const fredCoal = await fred.getCoalPrice(start, end);
|
|
if (fredCoal.ok) {
|
|
for (const p of fredCoal.data) {
|
|
rows.push({ commodity: p.commodity, price: p.price, unit: p.unit, timestamp: p.timestamp, source: p.source });
|
|
}
|
|
log(` FRED coal: ${fredCoal.data.length} records`);
|
|
} else {
|
|
log(` ERROR FRED coal: ${fredCoal.error}`);
|
|
}
|
|
|
|
if (rows.length === 0) {
|
|
log(' No commodity data fetched');
|
|
return;
|
|
}
|
|
|
|
// Deduplicate: for same commodity + timestamp, prefer EIA over FRED
|
|
const deduped = new Map<string, CommodityRow>();
|
|
for (const row of rows) {
|
|
const key = `${row.commodity}:${row.timestamp.getTime()}`;
|
|
const existing = deduped.get(key);
|
|
if (!existing || (existing.source === 'FRED' && row.source === 'EIA')) {
|
|
deduped.set(key, row);
|
|
}
|
|
}
|
|
const uniqueRows = [...deduped.values()];
|
|
|
|
// Batch upsert commodities
|
|
let totalInserted = 0;
|
|
let totalUpdated = 0;
|
|
for (let i = 0; i < uniqueRows.length; i += UPSERT_BATCH_SIZE) {
|
|
const batch = uniqueRows.slice(i, i + UPSERT_BATCH_SIZE);
|
|
const result = await upsertCommodityBatch(batch);
|
|
totalInserted += result.inserted;
|
|
totalUpdated += result.updated;
|
|
}
|
|
|
|
stats.commodityInserted = totalInserted;
|
|
stats.commodityUpdated = totalUpdated;
|
|
log(` Commodities: ${totalInserted} inserted, ${totalUpdated} updated (${uniqueRows.length} unique rows)`);
|
|
}
|
|
|
|
async function upsertCommodityBatch(rows: CommodityRow[]): Promise<UpsertResult> {
|
|
if (rows.length === 0) return { inserted: 0, updated: 0 };
|
|
|
|
const values: unknown[] = [];
|
|
const placeholders: string[] = [];
|
|
|
|
for (let i = 0; i < rows.length; i++) {
|
|
const row = rows[i]!;
|
|
const offset = i * 5;
|
|
placeholders.push(
|
|
`(gen_random_uuid(), $${offset + 1}, $${offset + 2}, $${offset + 3}, $${offset + 4}::timestamptz, $${offset + 5})`,
|
|
);
|
|
values.push(row.commodity, row.price, row.unit, row.timestamp, row.source);
|
|
}
|
|
|
|
const sql = `
|
|
WITH upserted AS (
|
|
INSERT INTO commodity_prices (id, commodity, price, unit, timestamp, source)
|
|
VALUES ${placeholders.join(',\n')}
|
|
ON CONFLICT (commodity, timestamp) DO UPDATE SET
|
|
price = EXCLUDED.price,
|
|
unit = EXCLUDED.unit,
|
|
source = EXCLUDED.source
|
|
RETURNING (xmax = 0) AS is_insert
|
|
)
|
|
SELECT
|
|
COUNT(*) FILTER (WHERE is_insert) AS inserted,
|
|
COUNT(*) FILTER (WHERE NOT is_insert) AS updated
|
|
FROM upserted
|
|
`;
|
|
|
|
const result = await prisma.$queryRawUnsafe<Array<{ inserted: bigint; updated: bigint }>>(sql, ...values);
|
|
const row = result[0]!;
|
|
return { inserted: Number(row.inserted), updated: Number(row.updated) };
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Materialized view refresh
|
|
// ---------------------------------------------------------------------------
|
|
|
|
const MATERIALIZED_VIEWS = [
|
|
'electricity_prices_daily',
|
|
'electricity_prices_weekly',
|
|
'generation_mix_daily',
|
|
'generation_mix_weekly',
|
|
] as const;
|
|
|
|
async function refreshMaterializedViews(): Promise<void> {
|
|
log('=== Refreshing materialized views ===');
|
|
for (const view of MATERIALIZED_VIEWS) {
|
|
try {
|
|
log(` Refreshing ${view}...`);
|
|
await prisma.$executeRawUnsafe(`REFRESH MATERIALIZED VIEW CONCURRENTLY ${view}`);
|
|
log(` ${view} refreshed`);
|
|
} catch (err) {
|
|
log(` ERROR refreshing ${view}: ${err instanceof Error ? err.message : String(err)}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Main
|
|
// ---------------------------------------------------------------------------
|
|
|
|
async function main(): Promise<void> {
|
|
const end = todayIso();
|
|
log(`Starting 10-year historical backfill...`);
|
|
log(`Date range: ${BACKFILL_START} to ${end}`);
|
|
log(`Regions: ${ALL_REGIONS.join(', ')} (${ALL_REGIONS.length} total)`);
|
|
log(`Concurrency: ${CONCURRENCY} regions in parallel`);
|
|
log('');
|
|
|
|
if (!onlyCommodities && !skipDemand) {
|
|
await backfillElectricity();
|
|
log('');
|
|
}
|
|
|
|
if (!onlyCommodities && !skipGeneration) {
|
|
await backfillGeneration();
|
|
log('');
|
|
}
|
|
|
|
if (!skipCommodities) {
|
|
await backfillCommodities();
|
|
log('');
|
|
}
|
|
|
|
// Refresh materialized views after data load
|
|
await refreshMaterializedViews();
|
|
log('');
|
|
|
|
log('=== Backfill Summary ===');
|
|
log(` Demand: ${stats.demandInserted} inserted, ${stats.demandUpdated} updated, ${stats.demandErrors} errors`);
|
|
log(` Generation: ${stats.genInserted} inserted, ${stats.genUpdated} updated, ${stats.genErrors} errors`);
|
|
log(` Commodities: ${stats.commodityInserted} inserted, ${stats.commodityUpdated} updated`);
|
|
log('Backfill complete.');
|
|
}
|
|
|
|
main()
|
|
.catch(err => {
|
|
console.error('Backfill failed:', err);
|
|
process.exit(1);
|
|
})
|
|
.finally(() => {
|
|
void prisma.$disconnect();
|
|
});
|