Source code for ufp.core.output

"""
Normalized output structures for UFP predictions.

This module defines the per-system and batched result containers used across
calculators, training, and adapter integrations.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Dict, Optional, Sequence

import numpy as np
import torch
from ase.stress import full_3x3_to_voigt_6_stress, voigt_6_to_full_3x3_stress

from ufp.core._arrays import ArrayLike, _to_numpy, _to_tensor
from ufp.core.input import UFPInput


[docs] @dataclass class UFPOutput: """ Standard output object returned by :class:`UFPotential`. Attributes: energy: Total energies with shape ``(n_systems,)`` or ``(n_systems, 1)``. forces: Atomic forces with shape ``(n_atoms, 3)``. stress: Stress with shape ``(n_systems, 3, 3)``, ``(3, 3)``, or Voigt ``(6,)``. per_atom_energy: Per-atom energies with shape ``(n_atoms,)`` or ``(n_atoms, 1)``. features: Optional named feature tensors. """ energy: Optional[ArrayLike] = None forces: Optional[ArrayLike] = None stress: Optional[ArrayLike] = None per_atom_energy: Optional[ArrayLike] = None features: Dict[str, ArrayLike] = field(default_factory=dict)
[docs] def as_ase_results(self, n_atoms: Optional[int] = None) -> Dict[str, object]: """Convert normalized outputs into ASE calculator results.""" results: Dict[str, object] = {} if self.energy is not None: energy = _to_numpy(self.energy).reshape(-1) if energy.size != 1: raise ValueError("`energy` must contain a single scalar value") results["energy"] = float(energy[0]) results["free_energy"] = float(energy[0]) if self.forces is not None: forces = _to_numpy(self.forces) if forces.ndim != 2 or forces.shape[1] != 3: raise ValueError("`forces` must have shape (n_atoms, 3)") if n_atoms is not None and forces.shape[0] != n_atoms: raise ValueError( f"`forces` must have shape ({n_atoms}, 3), got {forces.shape}" ) results["forces"] = forces if self.stress is not None: stress = _to_numpy(self.stress) if stress.shape == (1, 3, 3): stress = stress[0] if stress.shape == (3, 3): stress = full_3x3_to_voigt_6_stress(stress) elif stress.shape != (6,): raise ValueError("`stress` must have shape (3, 3), (1, 3, 3), or (6,)") results["stress"] = stress if self.per_atom_energy is not None: per_atom_energy = _to_numpy(self.per_atom_energy).reshape(-1) if n_atoms is not None and per_atom_energy.shape[0] != n_atoms: raise ValueError( "`per_atom_energy` must have one entry per atom, got " f"{per_atom_energy.shape[0]} entries for {n_atoms} atoms" ) results["energies"] = per_atom_energy return results
def _pack_array_by_system( values: ArrayLike, system_sizes: Sequence[int], *, fill_value: float = 0.0, ) -> tuple[ArrayLike, ArrayLike]: """Pack a flat atomwise array into a padded per-system tensor plus atom mask.""" if isinstance(values, torch.Tensor): max_atoms = max(system_sizes) packed = torch.full( (len(system_sizes), max_atoms, *values.shape[1:]), fill_value, dtype=values.dtype, device=values.device, ) mask = torch.zeros( (len(system_sizes), max_atoms), dtype=torch.bool, device=values.device, ) start = 0 for system_i, size in enumerate(system_sizes): packed[system_i, :size] = values[start : start + size] mask[system_i, :size] = True start += size return packed, mask max_atoms = max(system_sizes) packed = np.full( (len(system_sizes), max_atoms, *values.shape[1:]), fill_value, dtype=np.asarray(values).dtype, ) mask = np.zeros((len(system_sizes), max_atoms), dtype=bool) start = 0 for system_i, size in enumerate(system_sizes): packed[system_i, :size] = values[start : start + size] mask[system_i, :size] = True start += size return packed, mask
[docs] @dataclass class UFPBatchOutput: """ Batch-oriented output for multiple ASE or metatomic systems. Attributes: energy: Per-system total energies. forces: Padded per-system forces. stress: Per-system stress tensors. per_atom_energy: Padded per-system per-atom energies. features: Named feature tensors, padded when they are atomwise. atom_mask: Boolean mask identifying real atoms in padded atomwise outputs. system_sizes: Number of atoms in each system. """ energy: Optional[ArrayLike] = None forces: Optional[ArrayLike] = None stress: Optional[ArrayLike] = None per_atom_energy: Optional[ArrayLike] = None features: Dict[str, ArrayLike] = field(default_factory=dict) atom_mask: Optional[ArrayLike] = None system_sizes: Optional[ArrayLike] = None
[docs] @classmethod def from_output(cls, output: UFPOutput, inputs: UFPInput) -> "UFPBatchOutput": """Convert a flat ``UFPOutput`` into the padded batch representation.""" system_sizes = inputs.system_sizes atom_mask: Optional[ArrayLike] = None system_sizes_array: ArrayLike = torch.as_tensor( system_sizes, dtype=torch.int64, device=inputs.device, ) energy = output.energy if energy is not None: energy = energy.reshape(inputs.n_systems, -1) if energy.shape[1] == 1: energy = energy[:, 0] forces = None if output.forces is not None: forces, atom_mask = _pack_array_by_system(output.forces, system_sizes) per_atom_energy = None if output.per_atom_energy is not None: per_atom_energy, new_mask = _pack_array_by_system( output.per_atom_energy, system_sizes, ) if atom_mask is None: atom_mask = new_mask features: Dict[str, ArrayLike] = {} for name, values in output.features.items(): if values.ndim >= 1 and values.shape[0] == inputs.n_atoms: packed_values, new_mask = _pack_array_by_system(values, system_sizes) features[name] = packed_values if atom_mask is None: atom_mask = new_mask else: features[name] = values return cls( energy=energy, forces=forces, stress=output.stress, per_atom_energy=per_atom_energy, features=features, atom_mask=atom_mask, system_sizes=system_sizes_array, )
def _coerce_energy(values: ArrayLike, inputs: UFPInput) -> torch.Tensor: """Normalize energy-like values to shape ``(n_systems,)``.""" tensor = _to_tensor(values, dtype=inputs.dtype, device=inputs.device) if inputs.n_systems == 1 and tensor.numel() == 1: return tensor.reshape(1) tensor = tensor.reshape(inputs.n_systems, -1) if tensor.shape[1] != 1: raise ValueError("term energy must provide one total energy per system") return tensor[:, 0] def _coerce_forces(values: ArrayLike, inputs: UFPInput) -> torch.Tensor: """Normalize force-like values to shape ``(n_atoms, 3)``.""" tensor = _to_tensor(values, dtype=inputs.dtype, device=inputs.device) if tuple(tensor.shape) != (inputs.n_atoms, 3): raise ValueError( "term forces must have shape " f"({inputs.n_atoms}, 3), got {tuple(tensor.shape)}" ) return tensor def _coerce_per_atom_energy(values: ArrayLike, inputs: UFPInput) -> torch.Tensor: """Normalize per-atom energies to shape ``(n_atoms,)``.""" tensor = _to_tensor(values, dtype=inputs.dtype, device=inputs.device) if tensor.ndim == 1 and tensor.shape[0] == inputs.n_atoms: return tensor if tensor.ndim == 2 and tuple(tensor.shape) == (inputs.n_atoms, 1): return tensor[:, 0] raise ValueError( "term per-atom energy must have shape " f"({inputs.n_atoms},) or ({inputs.n_atoms}, 1), got {tuple(tensor.shape)}" ) def _coerce_stress(values: ArrayLike, inputs: UFPInput) -> torch.Tensor: """Normalize stress-like values to per-system ``(3, 3)`` tensors.""" tensor = _to_tensor(values, dtype=inputs.dtype, device=inputs.device) if tensor.ndim == 3 and tuple(tensor.shape) == (inputs.n_systems, 3, 3): return tensor if inputs.n_systems != 1: raise ValueError("term stress must provide one 3x3 tensor per system") if tuple(tensor.shape) == (3, 3): return tensor.unsqueeze(0) if tuple(tensor.shape) == (1, 3, 3): return tensor if tuple(tensor.shape) == (6,): full = torch.as_tensor( voigt_6_to_full_3x3_stress(_to_numpy(tensor)), dtype=inputs.dtype, device=inputs.device, ) return full.unsqueeze(0) raise ValueError("term stress must have shape (3, 3), (1, 3, 3), or (6,)")
[docs] def sum_outputs(outputs: Sequence[UFPOutput], inputs: UFPInput) -> UFPOutput: """Sum term outputs into one normalized result for the same input.""" energy: Optional[torch.Tensor] = None forces: Optional[torch.Tensor] = None stress: Optional[torch.Tensor] = None per_atom_energy: Optional[torch.Tensor] = None features: dict[str, torch.Tensor] = {} complete_forces = True complete_stress = True for output in outputs: if output.energy is not None: term_energy = _coerce_energy(output.energy, inputs) energy = term_energy if energy is None else energy + term_energy if output.forces is not None: term_forces = _coerce_forces(output.forces, inputs) forces = term_forces if forces is None else forces + term_forces else: complete_forces = False if output.stress is not None: term_stress = _coerce_stress(output.stress, inputs) stress = term_stress if stress is None else stress + term_stress else: complete_stress = False if output.per_atom_energy is not None: term_per_atom_energy = _coerce_per_atom_energy( output.per_atom_energy, inputs ) per_atom_energy = ( term_per_atom_energy if per_atom_energy is None else per_atom_energy + term_per_atom_energy ) for name, values in output.features.items(): tensor = _to_tensor(values, dtype=inputs.dtype, device=inputs.device) if name in features: features[name] = features[name] + tensor else: features[name] = tensor return UFPOutput( energy=energy, forces=forces if complete_forces else None, stress=stress if complete_stress else None, per_atom_energy=per_atom_energy, features=features, )
__all__ = [ "UFPBatchOutput", "UFPOutput", "sum_outputs", ]