"""This module contains the `Py2SRIM` class."""
# pylint: disable=too-many-lines
import os
import platform
import subprocess
import threading
import time
import warnings
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import Path
import shutil
import numpy as np
from numpy import typing as npt
from irradiapy import config, dtypes, materials
from irradiapy.debris_database import DebrisDatabase
from irradiapy.materials.component import Component
from irradiapy.materials.element import Element
from irradiapy.recoilsdb import RecoilsDB
from irradiapy.srim.srimdb import SRIMDB
OS_NAME = platform.system()
if OS_NAME == "Windows":
import pygetwindow
import pywinauto
elif OS_NAME == "Linux":
missing_tools = [tool for tool in ("wine", "xdotool") if shutil.which(tool) is None]
if missing_tools:
warnings.warn(
"SRIM through Wine requires the following missing executable(s): "
+ ", ".join(missing_tools)
)
if not os.environ.get("DISPLAY"):
warnings.warn(
"SRIM automation requires an X11-compatible DISPLAY. "
"It works in a full X11 session, and may also work in a Wayland "
"session when SRIM/Wine runs through XWayland."
)
else:
warnings.warn(
"SRIM subpackage only supports Windows or Linux/Wine. "
f"{OS_NAME!r} not supported."
)
[docs]
@dataclass
class Py2SRIM:
"""Base class for running SRIM calculations from Python.
Attributes
----------
seed : int (default=0)
Seed for SRIM randomness.
root_dir: Path
Root directory where all calculations will be stored.
target : list[Component]
List of target components.
calculation : str
SRIM calculation.
srim_dir : Path (default=config.get_srim_dir())
Directory where SRIM is installed.
wineprefix : Path (default=Path(os.environ.get("WINEPREFIX", Path.home() / ".wine-srim2013")))
Wine prefix to use for running SRIM on Linux. By default, it uses the `WINEPREFIX`
environment variable if set, or `~/.wine-srim2013` otherwise.
wine_cmd : str (default=os.environ.get("WINE", "wine"))
Command to run Wine on Linux. By default, it uses the `WINE` environment variable if set,
or `wine` otherwise.
recoilsdb : RecoilsDB
Database to store all recoils collected from SPECTRA-PKA and SRIM calculations.
check_interval : float (default=0.2)
Interval to check for SRIM window/popups.
minimize_window : bool (default=False)
Whether to minimize the SRIM/TRIM window while calculations run.
plot_type : int (default=5)
Plot type during SRIM calculations. 5 for no plots (faster calculations).
xmin : float (default=0.0)
Minimum x for plots and depth-dependent means during SRIM calculations. Particularly
important for large targets, since SRIM divides it in 100 segments.
xmax : float (default=0.0)
Maximum x for plots and depth-dependent means during SRIM calculations. Particularly
important for large targets, since SRIM divides it in 100 segments.
0.0 for full target.
do_ranges : int (default=1)
Whether to save `RANGE.txt` file.
Disabling this might cause errors afterwards because of missing tables.
do_backscatt : int (default=1)
Whether to save `BACKSCAT.txt` file.
Disabling this might cause errors afterwards because of missing tables.
do_transmit : int (default=1)
Whether to save `TRANSMIT.txt` file.
Disabling this might cause errors afterwards because of missing tables.
do_sputtered : int (default=1)
Whether to save `SPUTTER.txt` file.
Disabling this might cause errors afterwards because of missing tables.
do_collisions : int (default=1)
Whether to save `COLLISON.txt` file.
Disabling this might cause errors afterwards because of missing tables.
exyz : float (default=0.0)
Whether to save ions position every time they loose `exyz` energy in the `EXYZ.txt` file.
autosave : int (default=0)
Autosave every this number of ions. 0 to disable.
"""
seed: int = 0
root_dir: Path = field(init=False)
target: list[Component] = field(init=False)
calculation: str = field(init=False)
srim_dir: Path = field(default_factory=config.get_srim_dir)
wineprefix: Path = field(
default_factory=lambda: Path(
os.environ.get("WINEPREFIX", Path.home() / ".wine-srim2013")
)
)
wine_cmd: str = field(default_factory=lambda: os.environ.get("WINE", "wine"))
recoilsdb: RecoilsDB = field(init=False)
check_interval: float = 0.2
minimize_window: bool = False
plot_type: int = 5
xmin: float = 0.0
xmax: float = 0.0
do_ranges: int = 1
do_backscatt: int = 1
do_transmit: int = 1
do_sputtered: int = 1
do_collisions: int = 1
exyz: float = 0.0
autosave: int = 0
reminders: int = field(default=0, init=False)
bragg: int = field(default=1, init=False)
# region SRIM files
def __generate_trimin(
self,
atomic_numbers: npt.NDArray[np.int64],
energies: npt.NDArray[np.float64],
) -> None:
"""Generates `TRIM.IN` file."""
nions = len(atomic_numbers)
atomic_mass = materials.ATOMIC_WEIGHT_BY_ATOMIC_NUMBER[atomic_numbers[0]]
energy = np.ceil(energies.max()) / 1e3
ncomponents = len(self.target)
nelements = sum(len(component.elements) for component in self.target)
if self.calculation == "quick":
calculation = 4
elif self.calculation == "full":
calculation = 5
else:
calculation = 7
trimin_path = self.srim_dir / "TRIM.IN"
with open(trimin_path, "w", encoding="ascii", newline="\r\n") as file:
file.write("TRIM.IN file generated by irradiapy.\n")
# Ion parameters
file.write(
(
"ion Z, A, energy, angle, number of ions, Bragg correction, autosave\n"
f"{atomic_numbers[0]} {atomic_mass} {energy} 0 "
f"{nions} {self.bragg} {self.autosave}\n"
)
)
# Calculation type and seed
file.write(
(
"calculation, seed, reminders\n"
f"{calculation} {self.seed} {self.reminders}\n"
)
)
# Output files
file.write(
(
"Diskfiles (0=no,1=yes): Ranges, Backscatt, Transmit, Sputtered, "
"Collisions(1=Ion;2=Ion+Recoils), Special EXYZ.txt file\n"
f"{self.do_ranges} {self.do_backscatt} {self.do_transmit} {self.do_sputtered} "
f"{self.do_collisions} {self.exyz}\n"
)
)
# Target material
file.write(
(
"target material, number of elements, layers\n"
f'"irradiapy" {nelements} {ncomponents}\n'
)
)
# Plot settings
file.write(
(
"PlotType (0-5); Plot Depths: Xmin, Xmax(Ang.) [=0 0 for Viewing Full Target]\n"
f"{self.plot_type} {self.xmin} {self.xmax}\n"
)
)
# Target elements
string = "target element, Z, mass\n"
for i, component in enumerate(self.target):
for j, element in enumerate(component.elements):
string += (
f"Atom {i*len(component.elements)+j+1} = {element.symbol} "
f"= {element.atomic_number} {element.atomic_weight}\n"
)
file.write(string)
# Target layers
string = "layer name, width density elements\nnumb. desc. (ang) (g/cm3) stoich.\n"
layers_info = []
for i, component in enumerate(self.target):
prev_stoichs = [0.0] * sum(len(l.elements) for l in self.target[:i])
next_stoichs = [0.0] * sum(
len(l.elements) for l in self.target[i + 1 :]
)
layer_info = f'{i} "layer{i}" {component.width} {component.density} '
layer_info += " ".join(
map(str, prev_stoichs + list(component.stoichs) + next_stoichs)
)
layers_info.append(layer_info)
file.write(string + "\n".join(layers_info) + "\n")
# Phases
string = "phases\n"
string += " ".join(map(str, (component.phase for component in self.target)))
file.write(string + "\n")
# Bragg
string = "target compound corrections (bragg)\n"
string += " ".join(
map(str, (component.srim_bragg for component in self.target))
)
file.write(string + "\n")
# Displacement energies
string = "displacement energies (eV)\n"
string += " ".join(
map(
str,
(
element.ed_avr
for component in self.target
for element in component.elements
),
)
)
file.write(string + "\n")
# Lattice energies
string = "lattice binding energies (eV)\n"
string += " ".join(
map(
str,
(
element.srim_el
for component in self.target
for element in component.elements
),
)
)
file.write(string + "\n")
# Binding energies
string = "surface binding energies (eV)\n"
string += " ".join(
map(
str,
(
element.srim_es
for component in self.target
for element in component.elements
),
)
)
file.write(string + "\n")
# Stopping power version
file.write("Stopping Power Version\n0\n")
def __generate_trimdat(
self,
atomic_numbers: npt.NDArray[np.int64],
energies: npt.NDArray[np.float64],
depths: npt.NDArray[np.float64] | None = None,
ys: npt.NDArray[np.float64] | None = None,
zs: npt.NDArray[np.float64] | None = None,
cosxs: npt.NDArray[np.float64] | None = None,
cosys: npt.NDArray[np.float64] | None = None,
coszs: npt.NDArray[np.float64] | None = None,
) -> npt.NDArray[np.float64]:
"""Generates `TRIM.DAT` file.
Parameters
----------
atomic_numbers : npt.NDArray[np.int64]
Atomic numbers.
energies : npt.NDArray[np.float64]
Energies.
depths : npt.NDArray[np.float64] | None (default=None)
Depths.
ys : npt.NDArray[np.float64] | None (default=None)
Y positions.
zs : npt.NDArray[np.float64] | None (default=None)
Z positions.
cosxs : npt.NDArray[np.float64] | None (default=None)
X directions.
cosys : npt.NDArray[np.float64] | None (default=None)
Y directions.
coszs : npt.NDArray[np.float64] | None (default=None)
Z directions.
Returns
-------
npt.NDArray[np.float64]
`TRIM.DAT` data.
"""
trimdat_path = self.srim_dir / "TRIM.DAT"
nions = atomic_numbers.size
if depths is None:
depths = np.zeros(nions)
if ys is None:
ys = np.zeros(nions)
if zs is None:
zs = np.zeros(nions)
if cosxs is None:
cosxs = np.ones(nions)
if cosys is None:
cosys = np.zeros(nions)
if coszs is None:
coszs = np.zeros(nions)
names = np.array([f"{i:06d}" for i in range(1, nions + 1)], dtype=str)
with open(trimdat_path, "w", encoding="ascii", newline="\r\n") as file:
file.write("<TRIM>\n" * 9)
file.write("Name atomic_number E x y z cosx cosy cosz\n")
for i in range(nions):
file.write(
(
f"{names[i]} {atomic_numbers[i]} {energies[i]} {depths[i]} {ys[i]} "
f"{zs[i]} {cosxs[i]} {cosys[i]} {coszs[i]}\n"
)
)
data = np.empty(nions, dtype=dtypes.trimdat)
for i in range(nions):
data[i]["name"] = names[i]
data[i]["atomic_number"] = atomic_numbers[i]
data[i]["energy"] = energies[i]
data[i]["pos"] = np.array([depths[i], ys[i], zs[i]])
data[i]["dir"] = np.array([cosxs[i], cosys[i], coszs[i]])
return data
def __generate_trimauto(self) -> None:
"""Generates `TRIMAUTO` file."""
with open(
self.srim_dir / "TRIMAUTO", "w", encoding="ascii", newline="\r\n"
) as file:
file.write("1\n\nCheck TRIMAUTO.txt for details.\n")
# endregion
# region Checks
def __check_transmit(self, srimdb: SRIMDB) -> None:
"""Checks if there are transmitted ions in the database."""
transmit_rows = list(srimdb.read("transmit", "atom_numb, energy"))
if transmit_rows:
msg = ", ".join(f"({row[0]}, {row[1]:.2g})" for row in transmit_rows)
raise RuntimeError(
"SRIM ions ended up transmitted. Consider increasing the "
"effective target width. (Z, E (eV)) = " + msg
)
def __check_backscat(self, srimdb: SRIMDB) -> None:
"""Checks if there are backscattered ions in the database."""
backscat_rows = list(srimdb.read("backscat", "atom_numb, energy"))
if backscat_rows:
msg = ", ".join(f"({row[0]}, {row[1]:.2g})" for row in backscat_rows)
raise RuntimeError(
"SRIM ions ended up backscattered. Consider increasing the "
"effective target width. (Z, E (eV)) = " + msg
)
# endregion
# region Run
@staticmethod
def __should_run_srim_for_recoil(
recoil: Element,
component: Component,
recoil_energy: float,
max_energy_rel: float,
fp_energy_abs: float,
debris_database: DebrisDatabase,
) -> bool:
"""Return whether a recoil should be sent to SRIM."""
matching_datasets = debris_database.matching_datasets(
recoil=recoil,
component=component,
)
if matching_datasets:
max_dataset_energy = max(dataset.max_energy for dataset in matching_datasets)
# Recoils above the relative dataset limit are sent to SRIM.
if recoil_energy > max_dataset_energy * max_energy_rel:
return True
return False
# Any unmatched recoil below fp_energy_abs is not sent to SRIM;
# Frenkel pairs are placed instead.
if recoil_energy < fp_energy_abs:
return False
# Send unmatched recoils at or above fp_energy_abs to SRIM.
return True
def __create_srimdb(
self,
path: Path,
target: list[Component],
calculation: str,
) -> SRIMDB:
"""Creates a SRIMDB instance."""
srimdb = SRIMDB(
path=path,
target=target,
calculation=calculation,
seed=self.seed,
plot_type=self.plot_type,
xmin=self.xmin,
xmax=self.xmax,
do_ranges=self.do_ranges,
do_backscatt=self.do_backscatt,
do_transmit=self.do_transmit,
do_sputtered=self.do_sputtered,
do_collisions=self.do_collisions,
exyz=self.exyz,
autosave=self.autosave,
)
return srimdb
def __srim_env(self) -> dict[str, str]:
"""Return the environment used to run SRIM/TRIM through Wine."""
env = os.environ.copy()
if OS_NAME == "Linux":
env["WINEPREFIX"] = str(
Path(env.get("WINEPREFIX", Path.home() / ".wine-srim2013"))
)
env.pop("WINEARCH", None)
# Avoid comma decimal formatting leaking into Wine/SRIM.
env.setdefault("LC_NUMERIC", "C")
return env
def __trim_command(self) -> list[str]:
"""Return the platform-specific TRIM launch command."""
if OS_NAME == "Windows":
return [str(self.srim_dir / "TRIM.exe")]
if OS_NAME == "Linux":
wine = os.environ.get("WINE", "wine")
return [wine, "TRIM.exe"]
raise RuntimeError(f"Unsupported operating system for SRIM: {OS_NAME!r}")
def __xdotool_search(self, title: str) -> str | None:
"""Return the first X11 window id matching a title, or None."""
result = subprocess.run(
["xdotool", "search", "--name", title],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
check=False,
env=self.__srim_env(),
)
if result.returncode != 0:
return None
for line in result.stdout.splitlines():
window_id = line.strip()
if window_id:
return window_id
return None
def __wineserver_wait(self) -> None:
"""Wait until Wine has no pending work for this prefix."""
if OS_NAME != "Linux":
return
wineserver = shutil.which("wineserver")
if wineserver is None:
return
subprocess.run(
[wineserver, "-w"],
env=self.__srim_env(),
check=False,
)
def __minimize_and_handle_popup(
self,
stop_event: threading.Event,
minimize_window: bool,
) -> None:
"""Optionally minimize SRIM/TRIM and dismiss the final TRIM popup."""
# pylint: disable=protected-access
window_title = "SRIM-2013.00"
popup_title = "End of TRIM.DAT calculation"
if OS_NAME == "Windows":
if minimize_window:
while not stop_event.is_set():
windows = pygetwindow.getWindowsWithTitle(window_title)
if windows:
window = windows[0]
app = pywinauto.Application().connect(handle=window._hWnd)
app.window(handle=window._hWnd).minimize()
break
time.sleep(self.check_interval)
if self.calculation != "quick":
while not stop_event.is_set():
popups = pygetwindow.getWindowsWithTitle(popup_title)
if popups:
popup = popups[0]
app = pywinauto.Application().connect(handle=popup._hWnd)
app.window(handle=popup._hWnd).send_keystrokes("{ENTER}")
break
time.sleep(self.check_interval)
return
if OS_NAME == "Linux":
if minimize_window:
while not stop_event.is_set():
window_id = self.__xdotool_search(window_title)
if window_id is not None:
subprocess.run(
["xdotool", "windowminimize", window_id],
check=False,
env=self.__srim_env(),
)
break
time.sleep(self.check_interval)
if self.calculation != "quick":
while not stop_event.is_set():
popup_id = self.__xdotool_search(popup_title)
if popup_id is not None:
subprocess.run(
["xdotool", "windowactivate", popup_id],
check=False,
env=self.__srim_env(),
)
subprocess.run(
["xdotool", "key", "Return"],
check=False,
env=self.__srim_env(),
)
break
time.sleep(self.check_interval)
return
raise RuntimeError(
f"Unsupported operating system for SRIM automation: {OS_NAME!r}"
)
def __component_from_depth(self, depth: float) -> Component:
"""Return the target component at given depth."""
current_depth = 0.0
for component in self.target:
if component.width is None:
raise ValueError("All target components must have a width.")
next_depth = current_depth + component.width
if current_depth <= depth <= next_depth:
return component
current_depth = next_depth
raise ValueError(f"depth={depth} is out of bounds of the target.")
def __srim_mask(
self,
atomic_numbers: npt.NDArray[np.int64],
energies: npt.NDArray[np.float64],
depths: npt.NDArray[np.float64],
max_energy_rel: float,
fp_energy_abs: float,
debris_database: DebrisDatabase,
) -> npt.NDArray[np.bool_]:
"""Return the mask of recoils that should be sent to SRIM."""
mask = np.zeros(atomic_numbers.shape, dtype=bool)
for i, atomic_number in enumerate(atomic_numbers):
recoil = materials.ELEMENT_BY_ATOMIC_NUMBER[int(atomic_number)]
component = self.__component_from_depth(depths[i])
mask[i] = self.__should_run_srim_for_recoil(
recoil=recoil,
component=component,
recoil_energy=energies[i],
max_energy_rel=max_energy_rel,
fp_energy_abs=fp_energy_abs,
debris_database=debris_database,
)
return mask
@staticmethod
def __group_ions_by_atomic_number(
mask: npt.NDArray[np.bool_],
atomic_numbers: npt.NDArray[np.int64],
energies: npt.NDArray[np.float64],
**extra_fields: npt.NDArray[np.float64],
) -> dict[int, dict[str, npt.NDArray | int]]:
"""Group selected ions by atomic number.
Parameters
----------
mask : npt.NDArray[np.bool_]
Boolean mask indicating which ions to include.
atomic_numbers : npt.NDArray[np.int64]
Atomic number of each ion.
energies : npt.NDArray[np.float64]
Energy (eV) of each ion.
**extra_fields : npt.NDArray[np.float64]
Any extra per-ion arrays to propagate (depths, ys, zs, cosxs, ...).
Returns
-------
dict[int, dict[str, npt.NDArray | int]]
A dictionary keyed by atomic number. Each value is another dict with:
'atomic_numbers' : np.ndarray
'energies' : np.ndarray
'<field name>' : np.ndarray # for each entry in extra_fields
'nions' : int
"""
ions: dict[int, dict[str, npt.NDArray | int]] = {}
unique_atomic_numbers = np.unique(atomic_numbers[mask])
for atomic_number in unique_atomic_numbers:
atomic_mask = (atomic_numbers == atomic_number) & mask
batch: dict[str, npt.NDArray | int] = {
"atomic_numbers": atomic_numbers[atomic_mask],
"energies": energies[atomic_mask],
}
for name, arr in extra_fields.items():
batch[name] = arr[atomic_mask]
batch["nions"] = batch["atomic_numbers"].size
ions[int(atomic_number)] = batch
return ions
def __tree2path_db(
self,
tree: tuple[int, ...],
create_parent_dir: bool,
) -> Path:
"""Converts a tree tuple into a database path.
Parameters
----------
tree : tuple[int, ...]
Tree tuple.
create_parent_dir : bool
Whether to create the parent directory.
Returns
-------
Path
Database path.
"""
branch_dir = self.root_dir.joinpath(
*(str(atomic_number) for atomic_number in tree)
)
if create_parent_dir:
branch_dir.mkdir(parents=True, exist_ok=True)
path = branch_dir / "srim.db"
return path
def __run(
self,
srimdb: SRIMDB,
atomic_numbers: npt.NDArray[np.int64],
energies: npt.NDArray[np.float64],
depths: npt.NDArray[np.float64] | None,
ys: npt.NDArray[np.float64] | None,
zs: npt.NDArray[np.float64] | None,
cosxs: npt.NDArray[np.float64] | None,
cosys: npt.NDArray[np.float64] | None,
coszs: npt.NDArray[np.float64] | None,
fail_on_backscat: bool,
fail_on_transmit: bool,
minimize_window: bool,
ignore_32bit_warning: bool = True,
) -> None:
"""Run the SRIM simulation.
Parameters
----------
srimdb : SRIMDB
SRIM database.
atomic_numbers : npt.NDArray[np.int64]
Ion atomic numbers.
energies : npt.NDArray[np.float64]
Ion energies.
depths : npt.NDArray[np.float64] | None
Ion initial depths.
ys : npt.NDArray[np.float64] | None
Ion initial y positions.
zs : npt.NDArray[np.float64] | None
Ion initial z positions.
cosxs : npt.NDArray[np.float64] | None
Ion initial x directions.
cosys : npt.NDArray[np.float64] | None
Ion initial y directions.
coszs : npt.NDArray[np.float64] | None
Ion initial z directions.
fail_on_backscat : bool
Whether to fail if there are backscattered ions.
fail_on_transmit : bool
Whether to fail if there are transmitted ions.
minimize_window : bool
Whether to minimize the SRIM window while SRIM simulations run.
"""
if ignore_32bit_warning:
warnings.filterwarnings(
"ignore",
category=UserWarning,
message="32-bit application should be automated using 32-bit Python",
)
if srimdb.table_exists("trimdat"):
raise RuntimeError(
(
f"The database {srimdb.path} is already populated "
"with data from another simulation, use another one."
)
)
# Generate input files
self.__generate_trimauto()
self.__generate_trimin(atomic_numbers, energies)
self.__generate_trimdat(
atomic_numbers,
energies,
depths=depths,
ys=ys,
zs=zs,
cosxs=cosxs,
cosys=cosys,
coszs=coszs,
)
# Run SRIM
print(f"Running {len(atomic_numbers)} SRIM ions")
stop_event = threading.Event()
window_thread = threading.Thread(
target=self.__minimize_and_handle_popup,
args=(stop_event, minimize_window),
daemon=True,
)
try:
window_thread.start()
subprocess.run(
self.__trim_command(),
cwd=self.srim_dir,
env=self.__srim_env(),
check=True,
)
self.__wineserver_wait()
finally:
stop_event.set()
window_thread.join(timeout=5.0)
# Append output files
srimdb.append_output()
srimdb.commit()
if fail_on_transmit:
self.__check_transmit(srimdb)
if fail_on_backscat:
self.__check_backscat(srimdb)
srimdb.optimize()
def __collect_recoils(
self,
max_energy_rel: float,
fp_energy_abs: float,
debris_database: DebrisDatabase,
atomic_numbers: npt.NDArray[np.int64],
energies: npt.NDArray[np.float64],
depths: npt.NDArray[np.float64],
ys: npt.NDArray[np.float64],
zs: npt.NDArray[np.float64],
cosxs: npt.NDArray[np.float64],
cosys: npt.NDArray[np.float64],
coszs: npt.NDArray[np.float64],
) -> None:
"""Collect all recoils from all SRIM databases into a single database taking into account
their hierarchy.
Parameters
----------
max_energy_rel : float
Relative recoil energy threshold for matching MD debris datasets. Recoils above
``max_dataset_energy * max_energy_rel`` are sent to SRIM, where
``max_dataset_energy`` is the highest recoil energy available in the matching
dataset data. Must be at least 1.0.
fp_energy_abs : float
Absolute recoil energy below which unmatched recoils are represented by Frenkel
pairs instead of being sent to SRIM, in eV.
atomic_numbers : npt.NDArray[np.int64]
Ion atomic numbers.
energies : npt.NDArray[np.float64]
Ion energies.
depths : npt.NDArray[np.float64]
Ion initial depths.
ys : npt.NDArray[np.float64]
Ion initial y positions.
zs : npt.NDArray[np.float64]
Ion initial z positions.
cosxs : npt.NDArray[np.float64]
Ion initial x directions.
cosys : npt.NDArray[np.float64]
Ion initial y directions.
coszs : npt.NDArray[np.float64]
Ion initial z directions.
"""
# For each SRIM database, keep track of how many ions we have already consumed.
# Key is the full path to the srim.db file.
srim_ion_counters: dict[Path, int] = defaultdict(int)
def collect_recoils_from_srim(
tree: tuple[int, ...],
ion_numb: int,
event: int,
) -> None:
"""Collect recoils from a single SRIM ion and recurse on high-energy recoils.
Parameters
----------
tree : tuple[int, ...]
Sequence of atomic numbers describing the SRIM path, e.g. (26,), (26, 76), ...
The corresponding SRIM DB is at:
root_dir / '26' / '76' / ... / 'srim.db'
ion_numb : int
Ion number inside this SRIM DB (ion_numb column).
event : int
Event index, 1-based. Here it is the original ion index + 1 from Py2SRIM.run.
"""
# Path to current SRIM DB for this tree
branch_path = self.__tree2path_db(tree, create_parent_dir=False)
# If this SRIM DB does not exist (e.g., max_srim_iters prevented creation), stop here.
if not branch_path.exists():
return
srimdb_branch = self.__create_srimdb(
path=branch_path,
target=None,
calculation=None,
)
collisions = list(
srimdb_branch.read(
table="collision",
what="depth, y, z, cosx, cosy, cosz, recoil_energy, atom_hit",
conditions=f"WHERE ion_numb={ion_numb}",
)
)
srimdb_branch.close()
for (
depth,
y,
z,
cosx,
cosy,
cosz,
recoil_energy,
atom_hit,
) in collisions:
atomic_number = int(materials.ATOMIC_NUMBER_BY_SYMBOL[atom_hit])
component = self.__component_from_depth(float(depth))
should_run = self.__should_run_srim_for_recoil(
recoil=materials.ELEMENT_BY_ATOMIC_NUMBER[atomic_number],
component=component,
recoil_energy=float(recoil_energy),
max_energy_rel=max_energy_rel,
fp_energy_abs=fp_energy_abs,
debris_database=debris_database,
)
if not should_run:
# This recoil is terminal; store it.
self.recoilsdb.insert_recoil(
event=event,
atomic_number=atomic_number,
recoil_energy=recoil_energy,
x=depth,
y=y,
z=z,
cosx=cosx,
cosy=cosy,
cosz=cosz,
)
continue
# Recoil selected for SRIM: try to follow to the next SRIM level.
leaf_tree = (*tree, atomic_number)
leaf_db_path = self.__tree2path_db(
leaf_tree,
create_parent_dir=False,
)
# If the child SRIM DB does not exist (e.g., limited by max_srim_iters),
# treat as terminal.
if not leaf_db_path.exists():
self.recoilsdb.insert_recoil(
event=event,
atomic_number=atomic_number,
recoil_energy=recoil_energy,
x=depth,
y=y,
z=z,
cosx=cosx,
cosy=cosy,
cosz=cosz,
)
continue
# Map this collision to the correct ion in the child SRIM DB.
child_ion_numb = srim_ion_counters[leaf_db_path] + 1
srim_ion_counters[leaf_db_path] = child_ion_numb
collect_recoils_from_srim(
tree=leaf_tree,
ion_numb=child_ion_numb,
event=event,
)
# Start from the primary ions given to Py2SRIM.run
nions = atomic_numbers.size
for i in range(nions):
event = int(i + 1)
ion_atomic_number = int(atomic_numbers[i])
ion_energy = float(energies[i])
depth = float(depths[i])
y = float(ys[i])
z = float(zs[i])
cosx = float(cosxs[i])
cosy = float(cosys[i])
cosz = float(coszs[i])
component = self.__component_from_depth(depth)
should_run = self.__should_run_srim_for_recoil(
recoil=materials.ELEMENT_BY_ATOMIC_NUMBER[ion_atomic_number],
component=component,
recoil_energy=ion_energy,
max_energy_rel=max_energy_rel,
fp_energy_abs=fp_energy_abs,
debris_database=debris_database,
)
# Terminal primary ion -> store into final DB as terminal recoil.
if not should_run:
self.recoilsdb.insert_recoil(
event=event,
atomic_number=ion_atomic_number,
recoil_energy=ion_energy,
x=depth,
y=y,
z=z,
cosx=cosx,
cosy=cosy,
cosz=cosz,
)
continue
# High-energy primary ion: follow SRIM cascades starting at atomic_number/srim.db.
branch_path = self.__tree2path_db(
(ion_atomic_number,),
create_parent_dir=False,
)
# No SRIM data for this ion: treat as terminal.
# Likely due to max_srim_iters or an aborted run.
if not branch_path.exists():
self.recoilsdb.insert_recoil(
event=event,
atomic_number=ion_atomic_number,
recoil_energy=ion_energy,
x=depth,
y=y,
z=z,
cosx=cosx,
cosy=cosy,
cosz=cosz,
)
continue
# Ion index in this first-level SRIM DB.
ion_numb = srim_ion_counters[branch_path] + 1
srim_ion_counters[branch_path] = ion_numb
collect_recoils_from_srim(
tree=(ion_atomic_number,),
ion_numb=ion_numb,
event=event,
)
def __collect_ions_vacs(
self,
max_energy_rel: float,
fp_energy_abs: float,
debris_database: DebrisDatabase,
atomic_numbers: npt.NDArray[np.int64],
energies: npt.NDArray[np.float64],
depths: npt.NDArray[np.float64],
) -> None:
"""Collect all ions and vacancies from all SRIM databases into a single database
taking into account their hierarchy.
"""
# For each SRIM database, keep track of how many ions we have already consumed.
# Key is the full path to the srim.db file.
srim_ion_counters: dict[Path, int] = defaultdict(int)
def collect_ions_vacs_from_srim(
tree: tuple[int, ...],
ion_numb: int,
event: int,
) -> None:
"""Collect ions and vacancies from a single SRIM ion and recurse on high-energy
recoils.
Parameters
----------
tree : tuple[int, ...]
Sequence of atomic numbers describing the SRIM path, e.g. (26,), (26, 76), ...
The corresponding SRIM DB is at:
root_dir / '26' / '76' / ... / 'srim.db'
ion_numb : int
Ion number inside this SRIM DB (ion_numb column).
event : int
Event index, 1-based (ion index from Py2SRIM.run).
"""
# Path to current SRIM DB for this tree
branch_path = self.__tree2path_db(tree, create_parent_dir=False)
# If this SRIM DB does not exist (e.g., max_srim_iters prevented creation), stop.
if not branch_path.exists():
return
srimdb_branch = self.__create_srimdb(
path=branch_path,
target=None,
calculation=None,
)
# Initial ion position: potential vacancy at the initial position
# Indicate with negative atomic number to distinguish from ion
trimdat = list(
srimdb_branch.read(
table="trimdat",
what="depth, y, z, atom_numb",
conditions=f"WHERE ion_numb={ion_numb}",
)
)
self.recoilsdb.insert_ion_vac(
event=event,
atom_numb=-int(trimdat[0][3]),
x=trimdat[0][0],
y=trimdat[0][1],
z=trimdat[0][2],
)
# Final ion position is saved if not transmitted/backscattered
range3d = list(
srimdb_branch.read(
table="range3d",
what="depth, y, z",
conditions=f"WHERE ion_numb={ion_numb}",
)
)
if range3d:
# If present, the ion stopped inside the target.
self.recoilsdb.insert_ion_vac(
event=event,
atom_numb=int(trimdat[0][3]),
x=range3d[0][0],
y=range3d[0][1],
z=range3d[0][2],
)
# Recoils for this ion_numb to decide further SRIM levels
collision_rows = list(
srimdb_branch.read(
table="collision",
what="depth, recoil_energy, atom_hit",
conditions=f"WHERE ion_numb={ion_numb}",
)
)
srimdb_branch.close()
# Recurse on high-energy recoils
for depth, recoil_energy, atom_hit in collision_rows:
recoil_energy = float(recoil_energy)
atomic_number = int(materials.ATOMIC_NUMBER_BY_SYMBOL[atom_hit])
component = self.__component_from_depth(float(depth))
should_run = self.__should_run_srim_for_recoil(
recoil=materials.ELEMENT_BY_ATOMIC_NUMBER[atomic_number],
component=component,
recoil_energy=recoil_energy,
max_energy_rel=max_energy_rel,
fp_energy_abs=fp_energy_abs,
debris_database=debris_database,
)
if not should_run:
# Terminal for ions/vacs purposes
continue
leaf_tree = (*tree, atomic_number)
leaf_db_path = self.__tree2path_db(
leaf_tree,
create_parent_dir=False,
)
# If the child SRIM DB does not exist (e.g., limited by max_srim_iters),
# treat as terminal.
if not leaf_db_path.exists():
continue
# Map this recoil to the correct ion in the child SRIM DB.
child_ion_numb = srim_ion_counters[leaf_db_path] + 1
srim_ion_counters[leaf_db_path] = child_ion_numb
collect_ions_vacs_from_srim(
tree=leaf_tree,
ion_numb=child_ion_numb,
event=event,
)
# Start from the primary ions given to Py2SRIM.run
nions = atomic_numbers.size
for i in range(nions):
event = int(i + 1)
ion_atomic_number = int(atomic_numbers[i])
ion_energy = float(energies[i])
component = self.__component_from_depth(depths[i])
should_run = self.__should_run_srim_for_recoil(
recoil=materials.ELEMENT_BY_ATOMIC_NUMBER[ion_atomic_number],
component=component,
recoil_energy=ion_energy,
max_energy_rel=max_energy_rel,
fp_energy_abs=fp_energy_abs,
debris_database=debris_database,
)
# Terminal primary ions never enter a SRIM branch, so no ions/vacs are collected.
if not should_run:
continue
# High-energy primary ion: follow SRIM cascades starting at Z/srim.db.
branch_path = self.__tree2path_db(
tree=(ion_atomic_number,),
create_parent_dir=False,
)
# No SRIM data for this ion: treat as terminal.
# Likely due to max_srim_iters or aborted runs.
if not branch_path.exists():
continue
# Ion index in this first-level SRIM DB.
ion_numb = srim_ion_counters[branch_path] + 1
srim_ion_counters[branch_path] = ion_numb
collect_ions_vacs_from_srim(
tree=(ion_atomic_number,),
ion_numb=ion_numb,
event=event,
)
[docs]
def run(
self,
root_dir: Path,
target: list[Component],
calculation: str,
atomic_numbers: npt.NDArray[np.int64],
energies: npt.NDArray[np.float64],
depths: npt.NDArray[np.float64],
ys: npt.NDArray[np.float64],
zs: npt.NDArray[np.float64],
cosxs: npt.NDArray[np.float64],
cosys: npt.NDArray[np.float64],
coszs: npt.NDArray[np.float64],
max_energy_rel: float,
max_srim_iters: int,
fail_on_transmit: bool,
fail_on_backscatt: bool,
fp_energy_abs: float = 1e3,
ignore_32bit_warning: bool = True,
minimize_window: bool | None = None,
) -> RecoilsDB:
"""Run SRIM iteratively, creating a folder tree driven by a recoil-energy threshold.
* Group ions by atomic number, keeping only those that should be sent to SRIM.
* Create a directory tree under ``root_dir`` where each branch holds a
``srim.db`` file.
* Run SRIM once per branch.
* Recursively spawn new SRIM runs for recoils from the ``collision`` table whose
energy is above the relative maximum of their matching MD debris datasets.
* Stop at depth ``max_srim_iters``. Depth is ``len(tree)``; for example,
``(26, 76, 26)`` has depth 3.
Parameters
----------
root_dir: Path
Root directory where all calculations will be stored.
target : list[Component]
SRIM target.
calculation : str
SRIM calculation.
atomic_numbers : npt.NDArray[np.int64]
Ion atomic numbers.
energies : npt.NDArray[np.float64]
Ion energies.
depths : npt.NDArray[np.float64]
Ion initial depths.
ys : npt.NDArray[np.float64]
Ion initial y positions.
zs : npt.NDArray[np.float64]
Ion initial z positions.
cosxs : npt.NDArray[np.float64]
Ion initial x directions.
cosys : npt.NDArray[np.float64]
Ion initial y directions.
coszs : npt.NDArray[np.float64]
Ion initial z directions.
max_energy_rel : float
Relative recoil energy threshold for matching MD debris datasets. Recoils above
``max_dataset_energy * max_energy_rel`` spawn further SRIM branches, where
``max_dataset_energy`` is the highest recoil energy available in the matching
dataset data. Must be at least 1.0.
max_srim_iters : int
Maximum number of SRIM iterations.
fail_on_transmit : bool
If True, raise if any ion is transmitted (TRANSMIT.txt non-empty).
fail_on_backscatt : bool
If True, raise if any ion is backscattered (BACKSCAT.txt non-empty).
fp_energy_abs : float, optional (default=1e3)
Absolute recoil energy below which unmatched recoils are represented by Frenkel
pairs instead of being sent to SRIM, in eV.
ignore_32bit_warning : bool (default=True)
Whether to ignore the 32-bit warning when using 32-bit SRIM with 64-bit Python.
minimize_window : bool | None (default=None)
Whether to minimize the SRIM/TRIM window while calculations run. If None, use
the instance-level ``minimize_window`` setting. The default instance setting is False.
Returns
-------
RecoilsDB
Database with all recoils collected.
"""
self.root_dir = root_dir
self.target = target
self.calculation = calculation
if minimize_window is None:
minimize_window = self.minimize_window
if not isinstance(minimize_window, bool):
raise TypeError("minimize_window must be a bool")
self.minimize_window = minimize_window
if max_energy_rel < 1.0:
raise ValueError("max_energy_rel must be at least 1.0")
if max_srim_iters < 1:
raise ValueError("max_srim_iters must be at least 1")
if self.calculation not in {"quick", "full", "mono"}:
raise ValueError("calculation must be 'quick', 'full' or 'mono'")
self.root_dir.mkdir(parents=True, exist_ok=True)
self.recoilsdb = RecoilsDB(self.root_dir / "recoils.db")
debris_database = config.get_debris_database()
def run_branch(
tree: tuple[int, ...],
batch: dict[str, npt.NDArray | int],
) -> None:
"""Run SRIM for one branch in the tree and write root_dir/.../srim.db."""
path = self.__tree2path_db(tree, True)
srimdb = self.__create_srimdb(
path=path,
target=self.target,
calculation=self.calculation,
)
self.__run(
srimdb=srimdb,
atomic_numbers=batch["atomic_numbers"],
energies=batch["energies"],
depths=batch["depths"],
ys=batch["ys"],
zs=batch["zs"],
cosxs=batch["cosxs"],
cosys=batch["cosys"],
coszs=batch["coszs"],
fail_on_backscat=fail_on_backscatt,
fail_on_transmit=fail_on_transmit,
minimize_window=minimize_window,
ignore_32bit_warning=ignore_32bit_warning,
)
srimdb.close()
def recurse(tree: tuple[int, ...]) -> None:
"""Process one srim.db and recursively spawn branches."""
path = self.__tree2path_db(tree, False)
# If the database does not exist, there are no recoils to process.
if not path.exists():
return
# Collect recoils from this branch for the next iteration
srimdb_branch = self.__create_srimdb(
path=path,
target=None,
calculation=None,
)
collisions = list(
srimdb_branch.read(
table="collision",
what="depth, y, z, cosx, cosy, cosz, recoil_energy, atom_hit",
conditions="ORDER BY ion_numb",
)
)
srimdb_branch.close()
if not collisions:
return
branch_depths = np.array([row[0] for row in collisions], dtype=np.float64)
branch_ys = np.array([row[1] for row in collisions], dtype=np.float64)
branch_zs = np.array([row[2] for row in collisions], dtype=np.float64)
branch_cosxs = np.array([row[3] for row in collisions], dtype=np.float64)
branch_cosys = np.array([row[4] for row in collisions], dtype=np.float64)
branch_coszs = np.array([row[5] for row in collisions], dtype=np.float64)
branch_energies = np.array([row[6] for row in collisions], dtype=np.float64)
branch_atomic_numbers = np.array(
[materials.ATOMIC_NUMBER_BY_SYMBOL[row[7]] for row in collisions],
dtype=np.int32,
)
srim_mask = self.__srim_mask(
atomic_numbers=branch_atomic_numbers,
energies=branch_energies,
depths=branch_depths,
max_energy_rel=max_energy_rel,
fp_energy_abs=fp_energy_abs,
debris_database=debris_database,
)
leaf_batches = self.__group_ions_by_atomic_number(
mask=srim_mask,
atomic_numbers=branch_atomic_numbers,
energies=branch_energies,
depths=branch_depths,
ys=branch_ys,
zs=branch_zs,
cosxs=branch_cosxs,
cosys=branch_cosys,
coszs=branch_coszs,
)
# No recoils selected for SRIM.
if not leaf_batches:
return
for atomic_number, batch in leaf_batches.items():
leaf_tree = (*tree, int(atomic_number))
depth_iter = len(leaf_tree)
if depth_iter >= max_srim_iters:
tree_str = "/".join(str(z) for z in leaf_tree)
print(
f"Skipping SRIM for Z={atomic_number} (path {tree_str}): "
f"reached max_srim_iters={max_srim_iters}"
)
continue
run_branch(leaf_tree, batch)
recurse(leaf_tree)
srim_mask = self.__srim_mask(
atomic_numbers=atomic_numbers,
energies=energies,
depths=depths,
max_energy_rel=max_energy_rel,
fp_energy_abs=fp_energy_abs,
debris_database=debris_database,
)
batches = self.__group_ions_by_atomic_number(
mask=srim_mask,
atomic_numbers=atomic_numbers,
energies=energies,
depths=depths,
ys=ys,
zs=zs,
cosxs=cosxs,
cosys=cosys,
coszs=coszs,
)
if not batches:
print("No ions selected for SRIM; nothing to run.")
else:
for atomic_number, batch in batches.items():
tree = (int(atomic_number),)
run_branch(tree, batch)
if max_srim_iters > 1:
recurse(tree)
# Collect all recoils data into a single database
self.__collect_recoils(
max_energy_rel=max_energy_rel,
fp_energy_abs=fp_energy_abs,
debris_database=debris_database,
atomic_numbers=atomic_numbers,
energies=energies,
depths=depths,
ys=ys,
zs=zs,
cosxs=cosxs,
cosys=cosys,
coszs=coszs,
)
# Collect all ions and vacancies into a single database
self.__collect_ions_vacs(
max_energy_rel=max_energy_rel,
fp_energy_abs=fp_energy_abs,
debris_database=debris_database,
atomic_numbers=atomic_numbers,
energies=energies,
depths=depths,
)
self.recoilsdb.save_target(self.target)
self.recoilsdb.commit()
return self.recoilsdb
# endregion