Source code for irradiapy.analysis.debrismanager

"""This module contains the `DebrisManager` class."""

import warnings
from collections.abc import Callable
from dataclasses import dataclass, field
from pathlib import Path

import matplotlib.colors as mcolors
import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
from matplotlib.gridspec import GridSpec
from numpy.lib.recfunctions import structured_to_unstructured as _str2unstr
from scipy.spatial.transform import Rotation
from sklearn.decomposition import PCA

from irradiapy import config, dtypes, utils
from irradiapy.analysis.clusters import clusterize_atoms
from irradiapy.debris_database import DebrisDatabase
from irradiapy.enums import DamageEnergyMode, DisplacementMode
from irradiapy.io.lammpsreader import LAMMPSReader
from irradiapy.materials import Component, Element


[docs] @dataclass class DebrisManager: """Class used to reconstruct the damage produced by recoils from a database of MD debris. Attributes ---------- recoil : Element Recoil element. component : Component Target material. damage_energy_mode : DamageEnergyMode Mode for recoil to damage energy calculation. displacement_mode : DisplacementMode Mode for calculation of number of displacement atoms. fp_dist : float Distance between the vacancy and the interstitial of a Frenkel pair, in angstroms. fp_energy_abs : float (default=1e3) Absolute recoil energy below which unmatched recoils are represented by Frenkel pairs, in eV. energy_tolerance : float (default=0.1) Tolerance for energy decomposition. For example, if this value if ``0.1``, the recoil energy is 194 keV and the database contains an energy of 200 keV, then 194 will be in the range 200 +/- 20 keV, therefore a cascade of 200 keV will be used, instead of decomposing 194 keV into, for example, 100x1 + 50x1 + 20x2 + 3x1 + 1xFP (Frenkel pairs). This fixes biases towards smaller clusters (lower energies) and helps reducing cascade overlapping. Set to ``0.0`` to disable this feature. seed : int, optional (default=0) Random seed for random number generator. """ recoil: Element component: Component damage_energy_mode: DamageEnergyMode displacement_mode: DisplacementMode fp_dist: float fp_energy_abs: float = 1e3 energy_tolerance: float = 0.1 seed: int = 0 debris_database: DebrisDatabase | None = None compute_damage_energy: bool = field(init=False) __rng: np.random.Generator = field(init=False) __calc_nd: Callable[[float], float] = field(init=False) __files: dict[float, list[Path]] = field(init=False) __energies: npt.NDArray[np.float64] = field(init=False) __nenergies: int = field(init=False) def __post_init__(self) -> None: self.__rng = np.random.default_rng(self.seed) if self.debris_database is None: self.debris_database = config.get_debris_database() self.__files = self.debris_database.matching_files_by_energy( recoil=self.recoil, component=self.component, ) self.__energies = np.array(sorted(self.__files.keys(), reverse=True)) self.__nenergies = len(self.__energies) self.compute_damage_energy = ( self.debris_database.electronic_interactions == "None" ) # Recoil to damage energy conversion self.__compute_damage_energy = ( lambda recoil_energy: self.component.recoil_energy_to_damage_energy( recoil_energy=recoil_energy, recoil=self.recoil, mode=self.damage_energy_mode, ) ) # Select the displacement model for residual energy self.__calc_nd = ( lambda damage_energy: self.component.damage_energy_to_displacements( damage_energy=damage_energy, mode=self.displacement_mode, ) ) def __get_fp_types(self, nfp: int) -> npt.NDArray[np.int32]: """Get the types of Frenkel pairs. Parameters ---------- nfp : int Number of Frenkel pairs. Returns ------- npt.NDArray[np.int32] Array of Frenkel pair types. """ stoichs = np.asarray(self.component.stoichs, dtype=np.float64) cdf = np.cumsum(stoichs) # cumulative distribution function, e.g. [0.5, 1.0] atomic_numbers = np.asarray( [e.atomic_number for e in self.component.elements], dtype=np.int32 ) r = self.__rng.random(nfp) # in [0, 1) idx = np.searchsorted(cdf, r, side="right") types = atomic_numbers[idx] return types def __get_files( self, recoil_energy: float ) -> tuple[dict[float, np.ndarray], int]: """Get cascade files and number of residual FP for a given recoil energy. Parameters ---------- recoil_energy : float Recoil energy. Returns ------- tuple[dict[float, np.ndarray], int] Dictionary of selected paths and number of residual FP. """ if self.__nenergies == 0: damage_energy = self.__compute_damage_energy(recoil_energy) nfp = np.round(self.__calc_nd(damage_energy)).astype(np.int64) return {}, int(nfp) # Decompose the recoil energy into cascades and residual energy # Rounding: if the recoil energy is closer than 10% to any energy of the database, use the # closest energy within that tolerance. This avoids: # Decomposition biasing towards smaller clusters (190 keV = 100 + 50 + 2x20 keV) # Residual FP causing artificial clustering (ignore them) diff = np.abs(self.__energies - recoil_energy) mask = diff <= self.energy_tolerance * self.__energies if np.any(mask): recoil_energy = self.__energies[mask][np.argmin(diff[mask])] residual_energy = ( self.__compute_damage_energy(recoil_energy) if self.compute_damage_energy else recoil_energy ) cascade_counts = np.zeros(self.__nenergies, dtype=np.int64) for i, energy in enumerate(self.__energies): cascade_counts[i], residual_energy = divmod(residual_energy, energy) # Select the files for each energy if residual_energy > 0: residual_energy = self.__compute_damage_energy(residual_energy) debris_files = { energy: self.__rng.choice(self.__files[energy], cascade_counts[i]) for i, energy in enumerate(self.__energies) } # Get the number of residual FP nfp = int(np.round(self.__calc_nd(residual_energy))) return debris_files, nfp
[docs] def get_recoil_debris( self, recoil_energy: float, recoil_pos: npt.NDArray[np.float64], recoil_dir: npt.NDArray[np.float64], ) -> dtypes.Defect: """Get recoil debris from its energy position, and direction. Parameters ---------- recoil_energy : float Recoil energy. recoil_pos : npt.NDArray[np.float64] Recoil position. recoil_dir : npt.NDArray[np.float64] Recoil direction. Returns ------- dtypes.Defect Defects after the cascades. """ files, nfp = self.__get_files(recoil_energy) # Get the maximum energy available in the database for the given recoil. # If no energy is available, return zero to place only FP. db_emax = next( (energy for energy in self.__energies if len(files[energy])), 0.0 ) # Possible to get cascades from the database if db_emax > 0.0: defects = self.__process_highest_energy_cascade( files, db_emax, recoil_pos, recoil_dir, ) parallelepiped = self.__get_parallelepiped(defects) defects = self.__place_other_debris(files, defects, parallelepiped) if nfp: defects = self.__place_fps_in_parallelepiped( defects, nfp, parallelepiped ) return defects # If no energy is available, generate FP only if nfp: defects = self.__place_fps_in_sphere(nfp, recoil_pos, recoil_dir) return defects defects = np.empty(0, dtype=dtypes.defect) return defects
def __process_highest_energy_cascade( self, files: dict[float, np.ndarray], db_emax: float, recoil_pos: npt.NDArray[np.float64], recoil_dir: npt.NDArray[np.float64], ) -> dtypes.Defect: """Process the highest energy cascade. Parameters ---------- files : dict[float, np.ndarray] Dictionary of files for each energy. db_emax : float Energy of the highest energy cascade. recoil_pos : npt.NDArray[np.float64] Recoil position. recoil_dir : npt.NDArray[np.float64] Recoil direction. Returns ------- dtypes.Defect Defects after the highest energy cascade. """ file = files[db_emax][0] files[db_emax] = np.delete(files[db_emax], 0) defects = utils.io.get_last_reader(LAMMPSReader(file))["atoms"] xaxis = np.array([1.0, 0.0, 0.0]) with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=UserWarning) transform = Rotation.align_vectors([recoil_dir], [xaxis])[0] pos = _str2unstr(defects[["x", "y", "z"]], dtype=np.float64, copy=False) pos = transform.apply(pos) + recoil_pos defects["x"] = pos[:, 0] defects["y"] = pos[:, 1] defects["z"] = pos[:, 2] return defects def __place_other_debris( self, files: dict[float, np.ndarray], defects: dtypes.Defect, parallelepiped: tuple[PCA, npt.NDArray, npt.NDArray], ) -> dtypes.Defect: """Place other debris in the parallelepiped. Parameters ---------- files : dict[float, np.ndarray] Dictionary of files for each energy. defects : dtypes.Defect Defects after the highest energy cascade. parallelepiped : tuple[PCA, npt.NDArray, npt.NDArray] Parallelepiped definition. Returns ------- dtypes.Defect Defects after placing the other debris. """ for energy in self.__energies: for file0 in files[energy]: defects_ = utils.io.get_last_reader(LAMMPSReader(file0))["atoms"] transform = Rotation.random(rng=self.__rng) pos = _str2unstr( defects_[["x", "y", "z"]], dtype=np.float64, copy=False ) pos0 = self.__get_parallelepiped_points(*parallelepiped, 1) pos = transform.apply(pos) + pos0 defects_["x"] = pos[:, 0] defects_["y"] = pos[:, 1] defects_["z"] = pos[:, 2] defects = np.concatenate((defects, defects_)) return defects def __place_fps_in_parallelepiped( self, defects: dtypes.Defect, nfp: int, parallelepiped: tuple[PCA, npt.NDArray, npt.NDArray], ) -> dtypes.Defect: """Place FPs anywhere in the parallelepiped. Parameters ---------- defects : dtypes.Defect Defects after placing the other debris. nfp : int Number of FPs. parallelepiped : tuple[PCA, npt.NDArray, npt.NDArray] Parallelepiped definition. Returns ------- dtypes.Defect Defects after placing the FPs. """ defects_ = np.zeros(2 * nfp, dtype=dtypes.defect) defects_["type"][:nfp] = self.__get_fp_types(nfp) defects_["x"][:nfp] = self.fp_dist / 2.0 pos = _str2unstr(defects_[["x", "y", "z"]], dtype=np.float64, copy=False) pos[:nfp] = Rotation.random(nfp, rng=self.__rng).apply(pos[:nfp]) pos[nfp:] = -pos[:nfp] pos0 = self.__get_parallelepiped_points(*parallelepiped, nfp) pos[:nfp] += pos0 pos[nfp:] += pos0 defects_["x"] = pos[:, 0] defects_["y"] = pos[:, 1] defects_["z"] = pos[:, 2] return np.concatenate((defects, defects_)) def __place_fps_in_sphere( self, nfp: int, recoil_pos: npt.NDArray[np.float64], recoil_dir: npt.NDArray[np.float64], ) -> dtypes.Defect: """Generate FPs in a sphere. Parameters ---------- nfp : int Number of FPs. recoil_pos : npt.NDArray[np.float64] Recoil position. recoil_dir : npt.NDArray[np.float64] Recoil direction. Returns ------- dtypes.Defect Defects after generating. """ defects_ = np.zeros(2 * nfp, dtype=dtypes.defect) defects_["type"][:nfp] = self.__get_fp_types(nfp) defects_["x"][:nfp] = self.fp_dist / 2.0 pos = _str2unstr(defects_[["x", "y", "z"]], dtype=np.float64, copy=False) pos[:nfp] = Rotation.random(nfp, rng=self.__rng).apply(pos[:nfp]) pos[nfp:] = -pos[:nfp] random = self.__rng.random((nfp, 3)) theta = np.arccos(2.0 * random[:, 0] - 1.0) phi = 2.0 * np.pi * random[:, 1] radius = nfp * self.fp_dist / 2.0 r = radius * np.cbrt(random[:, 2]) points = np.empty((nfp, 3)) points[:, 0] = r * np.sin(theta) * np.cos(phi) points[:, 1] = r * np.sin(theta) * np.sin(phi) points[:, 2] = r * np.cos(theta) pos[:nfp] += points pos[nfp:] += points pos += recoil_pos + recoil_dir * radius defects_["x"] = pos[:, 0] defects_["y"] = pos[:, 1] defects_["z"] = pos[:, 2] return defects_ def __get_parallelepiped( self, atoms: dtypes.Atom ) -> tuple[PCA, npt.NDArray[np.float64], npt.NDArray[np.float64]]: """ Define a parallelepiped from the atomic positions using PCA. Parameters ---------- atoms : dtypes.Atom Atomic positions. Returns ------- tuple[PCA, npt.NDArray[np.float64], npt.NDArray[np.float64]] PCA object, minimum PCA coordinates, maximum PCA coordinates. """ pos = _str2unstr(atoms[["x", "y", "z"]], dtype=np.float64, copy=False) pca = PCA(n_components=3) pca.fit(pos) atoms_pca = pca.transform(pos) min_pca = np.min(atoms_pca, axis=0) max_pca = np.max(atoms_pca, axis=0) return pca, min_pca, max_pca def __get_parallelepiped_points( self, pca: PCA, min_pca: npt.NDArray[np.float64], max_pca: npt.NDArray[np.float64], npoints: int, ) -> npt.NDArray[np.float64]: """ Generate random points within a parallelepiped. Parameters ---------- pca : PCA PCA object. min_pca : npt.NDArray[np.float64] Minimum PCA coordinates. max_pca : npt.NDArray[np.float64] Maximum PCA coordinates. npoints : int Number of points to generate. Returns ------- npt.NDArray[np.float64] Random points within the parallelepiped. """ random_points_pca = self.__rng.uniform(min_pca, max_pca, size=(npoints, 3)) return pca.inverse_transform(random_points_pca)
[docs] def displacements_number_plot( self, log: bool = True, show: bool = False, plot_path: Path | None = None, dpi: int = 300, ) -> None: """Plot the number of displacements vs recoil energy. Parameters ---------- log : bool If True, use logarithmic scale for both axes. show : bool, optional (default=False) Whether to show the plot. plot_path : Path | None, optional (default=None) Output path for the plot. dpi : int, optional (default=300) Dots per inch for the plot. """ # Debris from the database recoil_energies = list(self.__files.keys()) if not recoil_energies: raise ValueError("No matching MD debris datasets selected; cannot plot.") nds = [] stds = [] stes = [] for recoil_energy in recoil_energies: files = self.__files[recoil_energy] nfiles = len(files) nd = [] for file in files: defects = utils.io.get_last_reader(LAMMPSReader(file))["atoms"] # print(file) # print(defects) nvacs = len(defects[defects["type"] == 0]) nd.append(nvacs) mean = np.mean(nd) std = np.std(nd) ste = std / np.sqrt(nfiles) nds.append(mean) stds.append(std) stes.append(ste) recoil_energies = np.array(recoil_energies) nds = np.array(nds) stds = np.array(stds) stes = np.array(stes) # Theoretical models dx = ( self.component.ed_min if self.component.ed_min is not None else self.component.ed_avr ) recoil_energies_range = np.arange( 0.0, recoil_energies.max(), dx, ) damage_energies_range = np.array( [ self.component.recoil_energy_to_damage_energy( recoil_energy=recoil_energy, recoil=self.recoil, mode=self.damage_energy_mode, ) for recoil_energy in recoil_energies_range ] ) try: nds_nrt = np.array( [ self.component.damage_energy_to_displacements( damage_energy=damage_energy, mode=DisplacementMode.NRT, ) for damage_energy in damage_energies_range ] ) except Exception as e: # pylint: disable=broad-exception-caught nds_nrt = None print(f"Could not compute NRT model: {e}") try: nds_ferarc = np.array( [ self.component.damage_energy_to_displacements( damage_energy=damage_energy, mode=DisplacementMode.FERARC, ) for damage_energy in damage_energies_range ] ) except Exception as e: # pylint: disable=broad-exception-caught nds_ferarc = None print(f"Could not compute fer-arc model: {e}") # Plot fig = plt.figure() ax = fig.add_subplot() ax.set_xlabel("Recoil energy (keV)") ax.set_ylabel("Number of vacancies") if log: ax.set_xscale("log") ax.set_yscale("log") recoil_energies /= 1e3 recoil_energies_range /= 1e3 ax.errorbar( recoil_energies, nds, yerr=stes, fmt="o", label="debris", ) if nds_nrt is not None: ax.plot( recoil_energies_range, nds_nrt, label="NRT", ) if nds_ferarc is not None: ax.plot( recoil_energies_range, nds_ferarc, label="fer-arc", ) ax.legend() fig.tight_layout() if plot_path: plt.savefig(plot_path, dpi=dpi) if show: plt.show() plt.close()
[docs] def cluster_sizes_plot( self, vacs_cutoff: float, sias_cutoff: float, vacs_binwidth: int = 10, sias_binwidth: int = 10, log: bool = True, show: bool = False, vacs_plot_path: Path | None = None, sias_plot_path: Path | None = None, dpi: int = 300, ) -> None: """Plot the cluster sizes distribution vs recoil energy. Parameters ---------- vacs_cutoff : float Cutoff distance for vacancy clustering, in angstroms. sias_cutoff : float Cutoff distance for interstitial clustering, in angstroms. vacs_binwidth : int, optional (default=10) Bin width for vacancy cluster sizes histogram (bins >=11). sias_binwidth : int, optional (default=10) Bin width for interstitial cluster sizes histogram (bins >=11). log : bool If True, use logarithmic scale for the color map. show : bool, optional (default=False) Whether to show the plot. vacs_plot_path : Path | None, optional (default=None) Output path for the vacancy cluster sizes plot. sias_plot_path : Path | None, optional (default=None) Output path for the interstitial cluster sizes plot. dpi : int, optional (default=300) Dots per inch for the plots. """ recoil_energies: list[float] = sorted(list(self.__files.keys())) if not recoil_energies: raise ValueError("No matching MD debris datasets selected; cannot plot.") vac_sizes: list[npt.NDArray[np.int64]] = [] sia_sizes: list[npt.NDArray[np.int64]] = [] nfiles: dict[int, int] = {} for recoil_energy in recoil_energies: files = self.__files[recoil_energy] nfiles[recoil_energy] = len(files) vac_sizes_files: list[npt.NDArray[np.int64]] = [] sia_sizes_files: list[npt.NDArray[np.int64]] = [] for file in files: defects = utils.io.get_last_reader(LAMMPSReader(file))["atoms"] vacs = defects[defects["type"] == 0] sias = defects[defects["type"] != 0] vac_oclusters = clusterize_atoms(vacs, vacs_cutoff)[-1] sia_oclusters = clusterize_atoms(sias, sias_cutoff)[-1] vac_sizes_files.append(vac_oclusters["size"]) sia_sizes_files.append(sia_oclusters["size"]) vac_sizes.append( np.concatenate(vac_sizes_files) if vac_sizes_files else np.empty(0, dtype=np.int64) ) sia_sizes.append( np.concatenate(sia_sizes_files) if sia_sizes_files else np.empty(0, dtype=np.int64) ) def rounded_max(m: int, binwidth: int) -> int: # keep 1..10 unbinned, bin >=11 by binwidth if m <= 10: return 10 return 10 + int(np.ceil((m - 10) / binwidth) * binwidth) max_vac = max((int(v.max()) if v.size else 0) for v in vac_sizes) max_sia = max((int(s.max()) if s.size else 0) for s in sia_sizes) max_vac = rounded_max(max_vac, vacs_binwidth) max_sia = rounded_max(max_sia, sias_binwidth) vac_edges = np.concatenate( ( np.arange(0.5, 10.5), np.arange(10.5, max_vac + vacs_binwidth + 0.5, vacs_binwidth), ) ) sia_edges = np.concatenate( ( np.arange(0.5, 10.5), np.arange(10.5, max_sia + sias_binwidth + 0.5, sias_binwidth), ) ) recoils_nbins = len(recoil_energies) vhists = np.empty((recoils_nbins, len(vac_edges) - 1), dtype=np.float64) ihists = np.empty((recoils_nbins, len(sia_edges) - 1), dtype=np.float64) for i, e in enumerate(recoil_energies): vhist, _ = np.histogram(vac_sizes[i], bins=vac_edges, density=False) ihist, _ = np.histogram(sia_sizes[i], bins=sia_edges, density=False) vhist = vhist / nfiles[e] ihist = ihist / nfiles[e] vhists[i] = vhist ihists[i] = ihist def plot( hists: npt.NDArray[np.float64], edges: npt.NDArray[np.float64], max_size: int, bin_width: int, path: Path | None, ) -> None: # hists = np.ma.masked_where(hists == 0, hists) hists = np.ma.masked_less_equal(hists, 0.0) fig = plt.figure(figsize=(8, 8)) gs = GridSpec(1, 2, height_ratios=[1], width_ratios=[1, 0.05]) ax = fig.add_subplot(gs[0, 0]) ax.set_xlabel("Energy (keV)") ax.set_ylabel("Cluster size") im = ax.imshow( hists.T, origin="lower", aspect="auto", norm=mcolors.LogNorm() if log else None, cmap="viridis", extent=[0, recoils_nbins, 0, hists.shape[1]], ) xticks = np.arange(recoils_nbins) + 0.5 xlabels = [f"{e / 1000.0:<.1f}" for e in recoil_energies] ax.set_xticks(xticks) ax.set_xticklabels(xlabels) if bin_width == 1: ylabels = [str(i) for i in range(1, max_size + 1)] else: ylabels = [str(i) for i in range(1, 11)] + [ f"{i}-{i + bin_width - 1}" for i in range(11, max_size, bin_width) ] yticks = np.arange(len(edges) - 1) + 0.5 ax.set_yticks(yticks) ax.set_yticklabels(ylabels) cax = fig.add_subplot(gs[0, 1]) fig.colorbar(im, cax=cax, label="Counts per cascade") fig.tight_layout() if path is not None: fig.savefig(path, dpi=dpi) if show: plt.show() plt.close(fig) plot(ihists, sia_edges, max_sia, sias_binwidth, sias_plot_path) plot(vhists, vac_edges, max_vac, vacs_binwidth, vacs_plot_path)