"""Prediction and result-analysis helpers for UFP models."""
from __future__ import annotations
from pathlib import Path
from statistics import NormalDist
from typing import Sequence
import ase
import numpy as np
import torch
from ufp.core.potential import BatchedUFPotential
from ufp.terms import UFPModel
FloatArrayLike = Sequence[float] | np.ndarray
IntArrayLike = Sequence[int] | np.ndarray
DEFAULT_UNCERTAINTY_COVERAGES = (0.50, 0.68, 0.90, 0.95)
def _resolve_device(model: UFPModel, device: torch.device | str | None) -> torch.device:
if device is not None:
return torch.device(device)
try:
return next(model.parameters()).device
except StopIteration:
return torch.device("cpu")
[docs]
def predict_atoms(
model: UFPModel,
frames: Sequence[ase.Atoms],
*,
batch_size: int = 1024,
device: torch.device | str | None = None,
dtype: torch.dtype | None = None,
empty_cuda_cache: bool = False,
progress: bool = False,
) -> tuple[np.ndarray, tuple[np.ndarray, ...]]:
"""Predict total energies and forces for ASE structures."""
if batch_size <= 0:
raise ValueError("`batch_size` must be positive")
resolved_device = _resolve_device(model, device)
resolved_dtype = model.preferred_dtype() if dtype is None else dtype
model.to(device=resolved_device, dtype=resolved_dtype)
model.eval()
batched = BatchedUFPotential(model)
batched.eval()
predicted_energies: list[np.ndarray] = []
predicted_forces: list[np.ndarray] = []
selected_frames = [frame.copy() for frame in frames]
n_batches = max((len(selected_frames) + batch_size - 1) // batch_size, 1)
if progress:
print(
"Running prediction: "
f"{len(selected_frames)} structures in {n_batches} batches, "
f"device={resolved_device}, dtype={resolved_dtype}..."
)
for batch_index, start in enumerate(
range(0, len(selected_frames), batch_size),
start=1,
):
batch = selected_frames[start : start + batch_size]
if progress:
print(f"Predicting batch {batch_index}/{n_batches}...")
context = (
torch.inference_mode() if batched.provides_forces() else torch.enable_grad()
)
with context:
result = batched.compute_batch(
batch,
device=resolved_device,
dtype=resolved_dtype,
derive_forces=True,
)
energy = result.energy
forces = result.forces
system_sizes = result.system_sizes
if isinstance(energy, torch.Tensor):
energy = energy.detach().cpu().numpy()
if isinstance(forces, torch.Tensor):
forces = forces.detach().cpu().numpy()
if isinstance(system_sizes, torch.Tensor):
system_sizes = system_sizes.detach().cpu().numpy()
if forces is None:
raise RuntimeError("prediction did not return forces")
predicted_energies.append(np.asarray(energy, dtype=float))
for system_i, size in enumerate(np.asarray(system_sizes, dtype=int)):
predicted_forces.append(np.asarray(forces[system_i, :size], dtype=float))
del result, energy, forces, system_sizes
if empty_cuda_cache and resolved_device.type == "cuda":
torch.cuda.empty_cache()
if progress:
print("Finished prediction.")
return np.concatenate(predicted_energies), tuple(predicted_forces)
[docs]
def profile_prediction_batch(
model: UFPModel,
frames: Sequence[ase.Atoms],
*,
batch_size: int = 1024,
device: torch.device | str | None = None,
dtype: torch.dtype | None = None,
warmup: int = 2,
active: int = 5,
row_limit: int = 25,
trace_path: Path | str | None = None,
) -> str:
"""Profile repeated prediction of one ASE batch with torch profiler."""
if batch_size <= 0:
raise ValueError("`batch_size` must be positive")
if warmup < 0:
raise ValueError("`warmup` must be non-negative")
if active <= 0:
raise ValueError("`active` must be positive")
selected_frames = [frame.copy() for frame in list(frames)[:batch_size]]
if not selected_frames:
raise ValueError("`frames` must contain at least one structure")
resolved_device = _resolve_device(model, device)
resolved_dtype = model.preferred_dtype() if dtype is None else dtype
model.to(device=resolved_device, dtype=resolved_dtype)
model.eval()
batched = BatchedUFPotential(model)
batched.eval()
def run_once() -> None:
context = (
torch.inference_mode() if batched.provides_forces() else torch.enable_grad()
)
with context:
result = batched.compute_batch(
selected_frames,
device=resolved_device,
dtype=resolved_dtype,
derive_forces=True,
)
del result
if resolved_device.type == "cuda":
torch.cuda.synchronize(resolved_device)
for _ in range(warmup):
run_once()
activities = [torch.profiler.ProfilerActivity.CPU]
sort_by = "cpu_time_total"
if resolved_device.type == "cuda" and torch.cuda.is_available():
activities.append(torch.profiler.ProfilerActivity.CUDA)
sort_by = "cuda_time_total"
with torch.profiler.profile(
activities=activities,
record_shapes=True,
profile_memory=True,
with_stack=False,
) as profiler:
for _ in range(active):
run_once()
if trace_path is not None:
profiler.export_chrome_trace(str(trace_path))
table = profiler.key_averages().table(
sort_by=sort_by,
row_limit=row_limit,
)
print(table)
return table
[docs]
def compute_energy_force_metrics(
energies: FloatArrayLike,
forces: Sequence[object],
sizes: IntArrayLike,
predicted_energies: FloatArrayLike,
predicted_forces: Sequence[object],
) -> dict[str, np.ndarray | float]:
"""Compute per-atom energy and force-component RMSE metrics."""
sizes_array = np.asarray(sizes, dtype=float)
target_energy_per_atom = np.asarray(energies, dtype=float) / sizes_array
predicted_energy_per_atom = (
np.asarray(predicted_energies, dtype=float) / sizes_array
)
target_forces = np.concatenate(
[np.asarray(force, dtype=float).reshape(-1) for force in forces]
)
predicted_force_components = np.concatenate(
[np.asarray(force, dtype=float).reshape(-1) for force in predicted_forces]
)
return {
"target_energy_per_atom": target_energy_per_atom,
"predicted_energy_per_atom": predicted_energy_per_atom,
"target_force_components": target_forces,
"predicted_force_components": predicted_force_components,
"rmse_energy_mev_per_atom": float(
1000.0
* np.sqrt(
np.mean((target_energy_per_atom - predicted_energy_per_atom) ** 2)
)
),
"rmse_force_mev_per_angstrom": float(
1000.0 * np.sqrt(np.mean((target_forces - predicted_force_components) ** 2))
),
}
[docs]
def compute_energy_uncertainty_calibration(
true_energies: FloatArrayLike,
predicted_energies: FloatArrayLike,
sizes: IntArrayLike,
energy_variance: FloatArrayLike,
*,
variance_floor: float = 1.0e-12,
coverages: Sequence[float] = DEFAULT_UNCERTAINTY_COVERAGES,
) -> dict[str, np.ndarray | float | int]:
"""Compute per-atom energy uncertainty calibration diagnostics."""
true = np.asarray(true_energies, dtype=float).reshape(-1)
predicted = np.asarray(predicted_energies, dtype=float).reshape(-1)
sizes_array = np.asarray(sizes, dtype=float).reshape(-1)
variance = np.asarray(energy_variance, dtype=float).reshape(-1)
if true.shape != predicted.shape:
raise ValueError("true and predicted energies must have the same shape")
if true.shape != sizes_array.shape:
raise ValueError("sizes must have one entry per energy")
if true.shape != variance.shape:
raise ValueError("energy variance must have one entry per energy")
if np.any(sizes_array <= 0.0):
raise ValueError("sizes must be positive")
if np.any(variance < 0.0):
raise ValueError("energy variances must be non-negative")
if float(variance_floor) <= 0.0:
raise ValueError("variance_floor must be positive")
residual_per_atom = (predicted - true) / sizes_array
variance_per_atom = np.maximum(
variance / (sizes_array * sizes_array),
variance_floor,
)
predicted_std_per_atom = np.sqrt(variance_per_atom)
normalized_residual = residual_per_atom / predicted_std_per_atom
squared_residual = residual_per_atom * residual_per_atom
nll = 0.5 * (
np.log(2.0 * np.pi * variance_per_atom) + squared_residual / variance_per_atom
)
nominal = np.asarray(coverages, dtype=float).reshape(-1)
if np.any((nominal <= 0.0) | (nominal >= 1.0)):
raise ValueError("coverages must be between 0 and 1")
normal = NormalDist()
z_values = np.asarray(
[normal.inv_cdf((1.0 + float(value)) / 2.0) for value in nominal],
dtype=float,
)
empirical = np.asarray(
[np.mean(np.abs(normalized_residual) <= z_value) for z_value in z_values],
dtype=float,
)
denominator = float(np.sum(variance_per_atom * variance_per_atom))
calibration_slope = (
float(np.sum(squared_residual * variance_per_atom) / denominator)
if denominator > 0.0
else float("nan")
)
abs_residual = np.abs(residual_per_atom)
if len(abs_residual) < 2 or np.std(abs_residual) == 0.0:
correlation = float("nan")
elif np.std(predicted_std_per_atom) == 0.0:
correlation = float("nan")
else:
correlation = float(np.corrcoef(abs_residual, predicted_std_per_atom)[0, 1])
return {
"n_structures": int(true.size),
"energy_residual_per_atom": residual_per_atom,
"predicted_energy_std_per_atom": predicted_std_per_atom,
"normalized_energy_residual": normalized_residual,
"nominal_coverage": nominal,
"coverage_z": z_values,
"empirical_coverage": empirical,
"gaussian_nll_per_atom": float(np.mean(nll)),
"rmse_energy_mev_per_atom": float(1000.0 * np.sqrt(np.mean(squared_residual))),
"mae_energy_mev_per_atom": float(1000.0 * np.mean(abs_residual)),
"mean_predicted_std_mev_per_atom": float(
1000.0 * np.mean(predicted_std_per_atom)
),
"normalized_residual_mean": float(np.mean(normalized_residual)),
"normalized_residual_std": float(np.std(normalized_residual)),
"calibration_slope": calibration_slope,
"abs_residual_std_correlation": correlation,
}
[docs]
def fit_energy_variance_scale(
true_energies: FloatArrayLike,
predicted_energies: FloatArrayLike,
sizes: IntArrayLike,
energy_variance: FloatArrayLike,
*,
variance_floor: float = 1.0e-12,
) -> float:
"""Return the scalar variance multiplier minimizing Gaussian energy NLL."""
true = np.asarray(true_energies, dtype=float).reshape(-1)
predicted = np.asarray(predicted_energies, dtype=float).reshape(-1)
sizes_array = np.asarray(sizes, dtype=float).reshape(-1)
variance = np.asarray(energy_variance, dtype=float).reshape(-1)
if true.shape != predicted.shape or true.shape != sizes_array.shape:
raise ValueError("energy arrays and sizes must have matching shapes")
if true.shape != variance.shape:
raise ValueError("energy variance must have one entry per energy")
if np.any(sizes_array <= 0.0):
raise ValueError("sizes must be positive")
if np.any(variance < 0.0):
raise ValueError("energy variances must be non-negative")
if float(variance_floor) <= 0.0:
raise ValueError("variance_floor must be positive")
residual_per_atom = (predicted - true) / sizes_array
variance_per_atom = np.maximum(
variance / (sizes_array * sizes_array),
variance_floor,
)
scale = float(np.mean((residual_per_atom * residual_per_atom) / variance_per_atom))
return max(scale, float(variance_floor))
[docs]
def compute_energy_uncertainty_calibration_file(
npz_path: Path | str,
*,
variance_key: str = "energy_total_variance",
variance_floor: float = 1.0e-12,
coverages: Sequence[float] = DEFAULT_UNCERTAINTY_COVERAGES,
) -> dict[str, np.ndarray | float | int]:
"""Compute energy uncertainty calibration diagnostics from a prediction file."""
path = Path(npz_path)
data = np.load(path)
if variance_key not in data.files:
raise KeyError(f"{path} is missing uncertainty key `{variance_key}`")
size_key = "system_sizes" if "system_sizes" in data.files else "sizes"
if size_key not in data.files:
raise KeyError(f"{path} is missing `system_sizes` or `sizes`")
for key in ("true_energies", "predicted_energies"):
if key not in data.files:
raise KeyError(f"{path} is missing `{key}`")
return compute_energy_uncertainty_calibration(
data["true_energies"],
data["predicted_energies"],
data[size_key],
data[variance_key],
variance_floor=variance_floor,
coverages=coverages,
)
[docs]
def fit_energy_variance_scale_file(
npz_path: Path | str,
*,
variance_key: str = "energy_total_variance",
variance_floor: float = 1.0e-12,
) -> float:
"""Fit an energy variance multiplier from a prediction file."""
path = Path(npz_path)
data = np.load(path)
if variance_key not in data.files:
raise KeyError(f"{path} is missing uncertainty key `{variance_key}`")
size_key = "system_sizes" if "system_sizes" in data.files else "sizes"
if size_key not in data.files:
raise KeyError(f"{path} is missing `system_sizes` or `sizes`")
for key in ("true_energies", "predicted_energies"):
if key not in data.files:
raise KeyError(f"{path} is missing `{key}`")
return fit_energy_variance_scale(
data["true_energies"],
data["predicted_energies"],
data[size_key],
data[variance_key],
variance_floor=variance_floor,
)
[docs]
def compute_force_uncertainty_calibration(
true_force_components: FloatArrayLike,
predicted_force_components: FloatArrayLike,
force_variance: FloatArrayLike,
*,
variance_floor: float = 1.0e-12,
coverages: Sequence[float] = DEFAULT_UNCERTAINTY_COVERAGES,
) -> dict[str, np.ndarray | float | int]:
"""Compute force-component uncertainty calibration diagnostics."""
true = np.asarray(true_force_components, dtype=float).reshape(-1)
predicted = np.asarray(predicted_force_components, dtype=float).reshape(-1)
variance = np.asarray(force_variance, dtype=float).reshape(-1)
if true.shape != predicted.shape:
raise ValueError("true and predicted force components must have same shape")
if true.shape != variance.shape:
raise ValueError("force variance must have one entry per force component")
if np.any(variance < 0.0):
raise ValueError("force variances must be non-negative")
if float(variance_floor) <= 0.0:
raise ValueError("variance_floor must be positive")
residual = predicted - true
effective_variance = np.maximum(variance, variance_floor)
predicted_std = np.sqrt(effective_variance)
normalized = residual / predicted_std
squared_residual = residual * residual
nll = 0.5 * (
np.log(2.0 * np.pi * effective_variance) + squared_residual / effective_variance
)
nominal = np.asarray(coverages, dtype=float).reshape(-1)
if np.any((nominal <= 0.0) | (nominal >= 1.0)):
raise ValueError("coverages must be between 0 and 1")
normal = NormalDist()
z_values = np.asarray(
[normal.inv_cdf((1.0 + float(value)) / 2.0) for value in nominal],
dtype=float,
)
empirical = np.asarray(
[np.mean(np.abs(normalized) <= z_value) for z_value in z_values],
dtype=float,
)
denominator = float(np.sum(effective_variance * effective_variance))
slope = (
float(np.sum(squared_residual * effective_variance) / denominator)
if denominator > 0.0
else float("nan")
)
abs_residual = np.abs(residual)
if len(abs_residual) < 2 or np.std(abs_residual) == 0.0:
correlation = float("nan")
elif np.std(predicted_std) == 0.0:
correlation = float("nan")
else:
correlation = float(np.corrcoef(abs_residual, predicted_std)[0, 1])
return {
"n_force_components": int(true.size),
"force_residual": residual,
"predicted_force_std": predicted_std,
"normalized_force_residual": normalized,
"nominal_coverage": nominal,
"coverage_z": z_values,
"empirical_coverage": empirical,
"gaussian_nll": float(np.mean(nll)),
"rmse_force_mev_per_angstrom": float(
1000.0 * np.sqrt(np.mean(squared_residual))
),
"mae_force_mev_per_angstrom": float(1000.0 * np.mean(abs_residual)),
"mean_predicted_std_mev_per_angstrom": float(1000.0 * np.mean(predicted_std)),
"normalized_residual_mean": float(np.mean(normalized)),
"normalized_residual_std": float(np.std(normalized)),
"calibration_slope": slope,
"abs_residual_std_correlation": correlation,
}
[docs]
def compute_force_uncertainty_calibration_file(
npz_path: Path | str,
*,
variance_key: str = "force_total_variance_components",
variance_floor: float = 1.0e-12,
coverages: Sequence[float] = DEFAULT_UNCERTAINTY_COVERAGES,
) -> dict[str, np.ndarray | float | int]:
"""Compute force calibration diagnostics from a prediction file."""
path = Path(npz_path)
data = np.load(path)
for key in ("true_force_components", "predicted_force_components", variance_key):
if key not in data.files:
raise KeyError(f"{path} is missing `{key}`")
return compute_force_uncertainty_calibration(
data["true_force_components"],
data["predicted_force_components"],
data[variance_key],
variance_floor=variance_floor,
coverages=coverages,
)
[docs]
def print_uncertainty_calibration(
prefix: str,
metrics: dict[str, np.ndarray | float | int],
) -> None:
"""Print compact energy uncertainty calibration diagnostics."""
coverage = ", ".join(
f"{100.0 * nominal:.0f}%->{100.0 * empirical:.1f}%"
for nominal, empirical in zip(
np.asarray(metrics["nominal_coverage"], dtype=float),
np.asarray(metrics["empirical_coverage"], dtype=float),
strict=True,
)
)
print(f"{prefix} uncertainty calibration:")
print(f" structures: {int(metrics['n_structures'])}")
print(
" energy RMSE/std: "
f"{float(metrics['rmse_energy_mev_per_atom']):.3f}/"
f"{float(metrics['mean_predicted_std_mev_per_atom']):.3f} meV/atom"
)
print(
" normalized residual mean/std: "
f"{float(metrics['normalized_residual_mean']):.3f}/"
f"{float(metrics['normalized_residual_std']):.3f}"
)
print(f" Gaussian NLL per atom: {float(metrics['gaussian_nll_per_atom']):.6f}")
print(f" calibration slope: {float(metrics['calibration_slope']):.3f}")
print(
" |residual|-std correlation: "
f"{float(metrics['abs_residual_std_correlation']):.3f}"
)
print(f" empirical coverage: {coverage}")
[docs]
def print_force_uncertainty_calibration(
prefix: str,
metrics: dict[str, np.ndarray | float | int],
) -> None:
"""Print compact force uncertainty calibration diagnostics."""
coverage = ", ".join(
f"{100.0 * nominal:.0f}%->{100.0 * empirical:.1f}%"
for nominal, empirical in zip(
np.asarray(metrics["nominal_coverage"], dtype=float),
np.asarray(metrics["empirical_coverage"], dtype=float),
strict=True,
)
)
print(f"{prefix} force uncertainty calibration:")
print(f" components: {int(metrics['n_force_components'])}")
print(
" force RMSE/std: "
f"{float(metrics['rmse_force_mev_per_angstrom']):.3f}/"
f"{float(metrics['mean_predicted_std_mev_per_angstrom']):.3f} meV/A"
)
print(
" normalized residual mean/std: "
f"{float(metrics['normalized_residual_mean']):.3f}/"
f"{float(metrics['normalized_residual_std']):.3f}"
)
print(f" Gaussian NLL: {float(metrics['gaussian_nll']):.6f}")
print(f" calibration slope: {float(metrics['calibration_slope']):.3f}")
print(
" |residual|-std correlation: "
f"{float(metrics['abs_residual_std_correlation']):.3f}"
)
print(f" empirical coverage: {coverage}")
[docs]
def dimer_curve(
interaction_model: UFPModel,
symbols: tuple[str, str],
*,
r_min: float = 0.0,
r_max: float = 6.0,
n_points: int = 1000,
) -> tuple[np.ndarray, np.ndarray]:
"""Evaluate an interaction model on a two-atom dimer scan."""
distances = np.linspace(r_min, r_max, n_points)
energies = np.zeros_like(distances)
for idx, distance in enumerate(distances):
atoms = ase.Atoms(
symbols=list(symbols),
positions=[[0.0, 0.0, 0.0], [distance, 0.0, 0.0]],
cell=np.eye(3) * 20.0,
pbc=False,
)
output = interaction_model.compute(atoms, derive_forces=False)
assert output.energy is not None
energies[idx] = float(output.energy.detach().cpu().numpy().reshape(-1)[0])
return distances, energies
__all__ = [
"DEFAULT_UNCERTAINTY_COVERAGES",
"compute_force_uncertainty_calibration",
"compute_force_uncertainty_calibration_file",
"compute_energy_uncertainty_calibration",
"compute_energy_uncertainty_calibration_file",
"compute_energy_force_metrics",
"dimer_curve",
"fit_energy_variance_scale",
"fit_energy_variance_scale_file",
"print_force_uncertainty_calibration",
"print_uncertainty_calibration",
"predict_atoms",
"profile_prediction_batch",
]