Source code for irradiapy.srim.srimdb

"""This module contains the `SRIMDB` class."""

# pylint: disable=too-many-lines,protected-access
import os
import platform
import sqlite3
import subprocess
import threading
import time
import traceback
import warnings
from dataclasses import dataclass, field
from pathlib import Path
from types import TracebackType
from typing import Callable

import numpy as np
from numpy import typing as npt

from irradiapy import config, dtypes, materials
from irradiapy.srim.ofiles.backscat import Backscat
from irradiapy.srim.ofiles.collision import Collision
from irradiapy.srim.ofiles.e2recoil import E2Recoil
from irradiapy.srim.ofiles.ioniz import Ioniz
from irradiapy.srim.ofiles.lateral import Lateral
from irradiapy.srim.ofiles.novac import Novac
from irradiapy.srim.ofiles.phonon import Phonon
from irradiapy.srim.ofiles.range import Range
from irradiapy.srim.ofiles.range3d import Range3D
from irradiapy.srim.ofiles.sputter import Sputter
from irradiapy.srim.ofiles.subcollision import Subcollision
from irradiapy.srim.ofiles.transmit import Transmit
from irradiapy.srim.ofiles.trimdat import Trimdat
from irradiapy.srim.ofiles.vacancy import Vacancy
from irradiapy.srim.target.element import Element
from irradiapy.srim.target.layer import Layer
from irradiapy.srim.target.target import Target

platform = platform.system()
if platform == "Windows":
    import pygetwindow
    import pywinauto
else:
    warnings.warn(
        "SRIM subpackage only works for Windows. " f"'{platform}' not supported."
    )


[docs] @dataclass(kw_only=True) class SRIMDB(sqlite3.Connection): """Base class for running SRIM calculations and storing the output data in a SQLite database. Attributes ---------- path_db : Path Output database path. target : Target, optional SRIM target. calculation : str, optional SRIM calculation. check_interval : float Interval to check for SRIM window/popups. srim_path : Path Where all SRIM output files are. If given, it will automatically add all those files into the database. con : sqlite3.Connection Database connection. backscat : Backscat Class storing `BACKSCAT.txt` data. e2recoil : E2Recoil Class storing `E2RECOIL.txt` data. ioniz : Ioniz Class storing `IONIZ.txt` data. lateral : Lateral Class storing `LATERAL.txt` data. phonon : Phonon Class storing `PHONON.txt` data. range : Range Class storing `RANGE.txt` data. range3d : Range3D Class storing `RANGE_3D.txt` data. sputter : Sputter Class storing `SPUTTER.txt` data. transmit : Transmit Class storing `TRANSMIT.txt` data. trimdat : Trimdat Class storing `TRIM.DAT` data. vacancy : Vacancy Class storing `VACANCY.txt` data. """ path_db: Path calculation: None | str = None target: None | Target = None check_interval: float = 0.2 seed: int = 0 reminders: int = 0 plot_type: int = 5 xmin: float = 0.0 xmax: float = 0.0 do_ranges: int = 1 do_backscatt: int = 1 do_transmit: int = 1 do_sputtered: int = 1 do_collisions: int = 1 exyz: float = 0.0 bragg: int = 1 autosave: int = 0 dir_srim: Path = field(default_factory=lambda: config.DIR_SRIM) def __post_init__(self) -> None: """Initializes the `SRIMDB` object. Parameters ---------- path_db : Path Output database path. target : Target, optional (default=None) SRIM target. Do not provide this argument for read only. calculation : Calculation, optional (default=None) SRIM calculation. Do not provide this argument for read only. check_interval : float, optional (default=0.2) Interval to check for SRIM window/popups. """ super().__init__(self.path_db) self.backscat = Backscat(self) self.e2recoil = E2Recoil(self) self.ioniz = Ioniz(self) self.lateral = Lateral(self) self.phonon = Phonon(self) self.range = Range(self) self.range3d = Range3D(self) self.sputter = Sputter(self) self.transmit = Transmit(self) self.trimdat = Trimdat(self) self.vacancy = Vacancy(self) self.collision = Collision(self) self.subcollision = Subcollision(self) if self.calculation not in ["quick", "full", "mono", None]: raise ValueError("Invalid calculation mode.") self.nions = self.get_nions() if self.target and self.calculation and not self.table_exists("calculation"): self.save_target_calculation() elif ( not self.target and not self.calculation and self.table_exists("calculation") ): self.target = self.load_target_calculation() elif (self.target and not self.calculation) or ( not self.target and self.calculation ): raise ValueError( "Both `target` and `calculation` must be provided or None." ) if self.calculation == "quick": self._filter_subcollisions_logic = self.__filter_subcollisions_logic_qc else: self.novac = Novac(self) self._filter_subcollisions_logic = self.__filter_subcollisions_logic_fc def __exit__( self, exc_type: None | type[BaseException] = None, exc_value: None | BaseException = None, exc_traceback: None | TracebackType = None, ) -> bool: """Exit the runtime context related to this object.""" self.close() return False
[docs] def save_target_calculation(self) -> None: """Saves the target and calculation parameters into the database.""" cur = self.cursor() cur.execute( ( "CREATE TABLE IF NOT EXISTS layers(" "layer_numb INTEGER, width REAL," "phase INTEGER, density REAL, bragg INTEGER)" ) ) cur.execute( ( "CREATE TABLE IF NOT EXISTS elements(" "layer_numb INTEGER, stoich REAL, symbol TEXT," "atomic_number INTEGER, atomic_mass REAL," "e_d REAL, e_l REAL, e_s REAL)" ) ) for i, layer in enumerate(self.target.layers): cur.execute( ( "INSERT INTO layers" "(layer_numb, width, phase, density, bragg)" "VALUES(?, ?, ?, ?, ?)" ), [i, layer.width, layer.phase, layer.density, layer.bragg], ) for j, element in enumerate(layer.elements): cur.execute( ( "INSERT INTO elements" "(layer_numb, stoich, symbol, atomic_number, atomic_mass, e_d, e_l, e_s)" "VALUES(?, ?, ?, ?, ?, ?, ?, ?)" ), [ i, layer.stoichs[j], element.symbol, element.atomic_number, element.atomic_mass, element.e_d, element.e_l, element.e_s, ], ) cur.execute( ( "CREATE TABLE IF NOT EXISTS calculation(" "mode INTEGER, seed INTEGER, reminders INTEGER, plot_type INTEGER," "xmin REAL, xmax REAL, ranges INTEGER, backscatt INTEGER," "transmit INTEGER, sputtered INTEGER, collisions INTEGER, exyz REAL," "bragg INTEGER, autosave INTEGER)" ) ) if self.calculation == "quick": calculation = 4 elif self.calculation == "full": calculation = 5 else: calculation = 7 cur.execute( ( "INSERT INTO calculation" "(mode, seed, reminders, plot_type, xmin, xmax, ranges, backscatt," "transmit, sputtered, collisions, exyz, bragg, autosave)" "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" ), [ calculation, self.seed, self.reminders, self.plot_type, self.xmin, self.xmax, self.do_ranges, self.do_backscatt, self.do_transmit, self.do_sputtered, self.do_collisions, self.exyz, self.bragg, self.autosave, ], ) cur.close() self.commit()
[docs] def load_target_calculation(self) -> Target: """Loads the target and calculation parameters from the database.""" cur = self.cursor() cur.execute("SELECT * FROM layers") db_layers = list(cur.fetchall()) layers = [] for db_layer in db_layers: cur.execute(("SELECT * FROM elements " f"WHERE layer_numb = {db_layer[0]}")) db_elements = list(cur.fetchall()) elements = [] for db_element in db_elements: elements.append( Element( db_element[2], db_element[3], db_element[4], db_element[5], db_element[6], db_element[7], ) ) stoichs = [db_element[1] for db_element in db_elements] layers.append( Layer( db_layer[1], db_layer[2], db_layer[3], elements, stoichs, db_layer[4], ) ) cur.execute("SELECT * FROM calculation") db_calculation = cur.fetchone() cur.close() self.seed = db_calculation[1] self.reminders = db_calculation[2] self.plot_type = db_calculation[3] self.xmin = db_calculation[4] self.xmax = db_calculation[5] self.do_ranges = db_calculation[6] self.do_backscatt = db_calculation[7] self.do_transmit = db_calculation[8] self.do_sputtered = db_calculation[9] self.do_collisions = db_calculation[10] self.exyz = db_calculation[11] self.bragg = db_calculation[12] self.autosave = db_calculation[13] mode = db_calculation[0] if mode == 4: self.calculation = "quick" elif mode == 5: self.calculation = "full" else: self.calculation = "mono" cur.close() return Target(layers)
[docs] def optimize(self) -> None: """Optimize the SQLite database. This method performs two operations to optimize the database: 1. Executes the "PRAGMA optimize" command to analyze and optimize the database. 2. Executes the "VACUUM" command to rebuild the database file, repacking it into a minimal amount of disk space. """ cur = self.cursor() cur.execute("PRAGMA optimize") cur.execute("VACUUM") cur.close()
[docs] def table_exists(self, table_name: str) -> bool: """Checks if the given table already exists in the database. Parameters ---------- table_name : str Table's name to check. Returns ------- bool Whether the table already exists or not. """ cur = self.cursor() cur.execute( ( "SELECT COUNT(*) FROM sqlite_master WHERE type='table'" f"AND name='{table_name}'" ) ) result = cur.fetchone()[0] cur.close() return bool(result)
[docs] def get_nions(self) -> int: """Gets the number of ions in the simulation.""" if self.table_exists("trimdat"): cur = self.cursor() cur.execute("SELECT COUNT(1) FROM trimdat") nions = cur.fetchone()[0] cur.close() return nions return 0
[docs] def append_backscat(self, backscat_path: Path) -> None: """Appends `BACKSCAT.txt` into the database. Parameters ---------- backscat_path : Path `BACKSCAT.txt` path. """ self.backscat.process_file(backscat_path)
[docs] def append_e2recoil(self, e2recoil_path: Path) -> None: """Appends `E2RECOIL.txt` into the database. Parameters ---------- e2recoil_path : Path `E2RECOIL.txt` path. """ self.e2recoil.process_file(e2recoil_path)
[docs] def append_ioniz(self, ioniz_path: Path) -> None: """Appends `IONIZ.txt` into the database. Parameters ---------- ioniz_path : Path `IONIZ.txt` path. """ self.ioniz.process_file(ioniz_path)
[docs] def append_lateral(self, lateral_path: Path) -> None: """Appends `LATERAL.txt` into the database. Parameters ---------- lateral_path : Path `LATERAL.txt` path. """ self.lateral.process_file(lateral_path)
[docs] def append_phonon(self, phonon_path: Path) -> None: """Appends `PHONON.txt` into the database. Parameters ---------- phonon_path : Path `PHONON.txt` path. """ self.phonon.process_file(phonon_path)
[docs] def append_range(self, range_path: Path) -> None: """Appends `RANGE.txt` into the database. Parameters ---------- range_path : Path `RANGE.txt` path. """ self.range.process_file(range_path)
[docs] def append_range3d(self, range3d_path: Path) -> None: """Appends `RANGE_3D.txt` into the database. Parameters ---------- range3d_path : Path `RANGE_3D.txt` path. """ self.range3d.process_file(range3d_path)
[docs] def append_sputter(self, sputter_path: Path) -> None: """Appends `SPUTTER.txt` into the database. Parameters ---------- sputter_path : Path `SPUTTER.txt` path. """ self.sputter.process_file(sputter_path)
[docs] def append_transmit(self, transmit_path: Path) -> None: """Appends `TRANSMIT.txt` into the database. Parameters ---------- transmit_path : Path `TRANSMIT.txt` path. """ self.transmit.process_file(transmit_path)
[docs] def append_trimdat(self, trimdat_path: Path) -> None: """Appends `TRIM.DAT` into the database. Parameters ---------- trimdat_path : Path `TRIM.DAT` path. """ self.trimdat.process_file(trimdat_path) self.nions = self.get_nions()
[docs] def append_vacancy(self, vacancy_path: Path) -> None: """Appends `VACANCY.txt` into the database. Parameters ---------- vacancy_path : Path `VACANCY.txt` path. """ self.vacancy.process_file(vacancy_path)
[docs] def append_subcollision(self, collision_path: Path) -> None: """Appends currect iteration `COLLISON.txt` into the database. Parameters ---------- collision_path : Path `COLLISON.txt` path. """ self.subcollision.process_file(collision_path)
[docs] def append_novac(self, novac_path: Path) -> None: """Appends `NOVAC.txt` into the database. Parameters ---------- novac_path : Path `NOVAC.txt` path. """ self.novac.process_file(novac_path)
[docs] def merge( self, srimdb2: "SRIMDB", backscat: bool = True, e2recoil: bool = True, ioniz: bool = True, lateral: bool = True, phonon: bool = True, range3d: bool = True, range_: bool = True, sputter: bool = True, transmit: bool = True, vacancy: bool = True, collision: bool = True, trimdat: bool = True, novac: bool = True, ) -> None: """Merges two databases. Parameters ---------- srimdb2 : SRIMDBIter SRIM database to merge. backscat : bool, optional Merge backscattering data. e2recoil : bool, optional Merge energy to recoil data. ioniz : bool, optional Merge ionization data. lateral : bool, optional Merge lateral data. phonon : bool, optional Merge phonon data. range3d : bool, optional Merge 3D range data. range_ : bool, optional Merge range data. sputter : bool, optional Merge sputtering data. transmit : bool, optional Merge transmission data. vacancy : bool, optional Merge vacancy data. collision : bool, optional Merge collision data. trimdat : bool, optional Merge TRIMDAT data. novac : bool, optional Merge NOVAC data. """ if backscat: self.backscat.merge(srimdb2) if e2recoil: self.e2recoil.merge(srimdb2) if ioniz: self.ioniz.merge(srimdb2) if lateral: self.lateral.merge(srimdb2) if phonon: self.phonon.merge(srimdb2) if range3d: self.range3d.merge(srimdb2) if range_: self.range.merge(srimdb2) if sputter: self.sputter.merge(srimdb2) if transmit: self.transmit.merge(srimdb2) if vacancy: self.vacancy.merge(srimdb2) if collision: self.collision.merge(srimdb2) if self.calculation in ["full", "mono"] and novac: self.novac.merge(srimdb2) if trimdat: self.trimdat.merge(srimdb2) self.optimize()
[docs] def generate_trimin( self, atomic_numbers: npt.NDArray[np.int64], energies: npt.NDArray[np.float64], target: Target, ) -> None: """Generates `TRIM.IN` file.""" nions = len(atomic_numbers) atomic_mass = materials.MATERIALS_BY_ATOMIC_NUMBER[ atomic_numbers[0] ].mass_number energy = np.ceil(energies.max()) / 1e3 if self.calculation == "quick": calculation = 4 elif self.calculation == "full": calculation = 5 else: calculation = 7 trimin_path = self.dir_srim / "TRIM.IN" with open(trimin_path, "w", encoding="utf-8") as file: file.write("TRIM.IN file generated by irradiapy.\n") file.write( ( "ion Z, A, energy, angle, number of ions, Bragg correction, autosave\n" f"{atomic_numbers[0]} {atomic_mass} {energy} 0 " f"{nions} {self.bragg} {self.autosave}\n" ) ) file.write( ( "calculation, seed, reminders\n" f"{calculation} {self.seed} {self.reminders}\n" ) ) file.write( ( "Diskfiles (0=no,1=yes): Ranges, Backscatt, Transmit, Sputtered, " "Collisions(1=Ion;2=Ion+Recoils), Special EXYZ.txt file\n" f"{self.do_ranges} {self.do_backscatt} {self.do_transmit} {self.do_sputtered} " f"{self.do_collisions} {self.exyz}\n" ) ) file.write(target.trimin_description() + "\n") file.write( ( "PlotType (0-5); Plot Depths: Xmin, Xmax(Ang.) [=0 0 for Viewing Full Target]\n" f"{self.plot_type} {self.xmin} {self.xmax}\n" ) ) file.write(target.trimin_target_elements()) file.write(target.trimin_target_layers() + "\n") file.write(target.trimin_phases() + "\n") file.write(target.trimin_bragg() + "\n") file.write(target.trimin_displacement() + "\n") file.write(target.trimin_lattice() + "\n") file.write(target.trimin_surface() + "\n") file.write("Stopping Power Version\n0\n")
[docs] def generate_trimdat( self, atomic_numbers: npt.NDArray[np.int64], energies: npt.NDArray[np.float64], depths: None | npt.NDArray[np.float64] = None, ys: None | npt.NDArray[np.float64] = None, zs: None | npt.NDArray[np.float64] = None, cosxs: None | npt.NDArray[np.float64] = None, cosys: None | npt.NDArray[np.float64] = None, coszs: None | npt.NDArray[np.float64] = None, ) -> npt.NDArray[np.float64]: """Generates `TRIM.DAT` file. Parameters ---------- atomic_numbers : npt.NDArray[np.int64] Atomic numbers. energies : npt.NDArray[np.float64] Energies. depths : npt.NDArray[np.float64], optional Depths. ys : npt.NDArray[np.float64], optional Y positions. zs : npt.NDArray[np.float64], optional Z positions. cosxs : npt.NDArray[np.float64], optional X directions. cosys : npt.NDArray[np.float64], optional Y directions. coszs : npt.NDArray[np.float64], optional Z directions. Returns ------- npt.NDArray[np.float64] `TRIM.DAT` data. """ trimdat_path = self.dir_srim / "TRIM.DAT" nions = atomic_numbers.size if depths is None: depths = np.zeros(nions) if ys is None: ys = np.zeros(nions) if zs is None: zs = np.zeros(nions) if cosxs is None: cosxs = np.ones(nions) if cosys is None: cosys = np.zeros(nions) if coszs is None: coszs = np.zeros(nions) names = np.array([f"{i:06d}" for i in range(1, nions + 1)], dtype=str) with open(trimdat_path, "w", encoding="utf-8") as file: file.write("<TRIM>\n" * 9) file.write("Name atomic_number E x y z cosx cosy cosz\n") for i in range(nions): file.write( ( f"{names[i]} {atomic_numbers[i]} {energies[i]} {depths[i]} {ys[i]} " f"{zs[i]} {cosxs[i]} {cosys[i]} {coszs[i]}\n" ) ) data = np.empty(nions, dtype=dtypes.trimdat) for i in range(nions): data[i]["name"] = names[i] data[i]["atomic_number"] = atomic_numbers[i] data[i]["energy"] = energies[i] data[i]["pos"] = np.array([depths[i], ys[i], zs[i]]) data[i]["dir"] = np.array([cosxs[i], cosys[i], coszs[i]]) return data
def __generate_trimauto(self) -> None: """Generates `TRIMAUTO` file.""" with open(self.dir_srim / "TRIMAUTO", "w", encoding="utf-8") as file: file.write("1\n\nCheck TRIMAUTO.txt for details.\n")
[docs] def minimize_and_handle_popup(self): """Minimizes the SRIM window and handles the end of calculation popup.""" window_title = "SRIM-2013.00" popup_title = "End of TRIM.DAT calculation" if platform == "Windows": # Minimize window while True: windows = pygetwindow.getWindowsWithTitle(window_title) if windows: window = windows[0] app = pywinauto.Application().connect(handle=window._hWnd) app.window(handle=window._hWnd).minimize() break time.sleep(self.check_interval) # Dismiss popup (quick-calculation does not have this popup) if self.calculation != "quick": while True: popups = pygetwindow.getWindowsWithTitle(popup_title) if popups: popup = popups[0] app = pywinauto.Application().connect(handle=popup._hWnd) app.window(handle=popup._hWnd).send_keystrokes("{ENTER}") break time.sleep(self.check_interval) elif platform == "Linux": # Minimize window while True: windows = pygetwindow.getWindowsWithTitle(window_title) if windows: window = windows[0] window_id = window._hWnd # pylint: disable=protected-access subprocess.run( ["xdotool", "windowminimize", str(window_id)], check=True ) break time.sleep(self.check_interval) # Dismiss popup (quick-calculation does not have this popup) if self.calculation != "quick": while True: popups = pygetwindow.getWindowsWithTitle(popup_title) if popups: popup = popups[0] popup_id = popup._hWnd # pylint: disable=protected-access subprocess.run( ["xdotool", "windowactivate", str(popup_id)], check=True ) subprocess.run(["xdotool", "key", "Return"], check=True) break time.sleep(self.check_interval)
[docs] def run( self, criterion: Callable, atomic_numbers: npt.NDArray[np.int64], energies: npt.NDArray[np.float64], remove_offsets: bool, depths: None | npt.NDArray[np.float64] = None, ys: None | npt.NDArray[np.float64] = None, zs: None | npt.NDArray[np.float64] = None, cosxs: None | npt.NDArray[np.float64] = None, cosys: None | npt.NDArray[np.float64] = None, coszs: None | npt.NDArray[np.float64] = None, iter_max: None | int = None, ignore_32bit_warning: bool = True, ) -> None: """Runs the SRIM simulation. Parameters ---------- criterion : Callable Criterion to repeat calculation, must return False to repeat calculation. Its signature is: `criterion(nion, energy, depth, y, z, se, atom_hit, pka_e, target_disp)`. Recommended to be defined as `def criterion(**kwargs: dict) -> bool:`. atomic_numbers : npt.NDArray[np.int64] Ion atomic numbers. energies : npt.NDArray[np.float64] Ion energies. remove_offsets : bool Whether to remove initial depth offsets or not. depths : npt.NDArray[np.float64], optional Ion initial depths. ys : npt.NDArray[np.float64], optional Ion initial y positions. zs : npt.NDArray[np.float64], optional Ion initial z positions. cosxs : npt.NDArray[np.float64], optional Ion initial x directions. cosys : npt.NDArray[np.float64], optional Ion initial y directions. coszs : npt.NDArray[np.float64], optional Ion initial z directions. iter_max : int, optional (default=None) Maximum number of iterations. """ if ignore_32bit_warning: warnings.filterwarnings( "ignore", category=UserWarning, message="32-bit application should be automated using 32-bit Python", ) if self.table_exists("collision"): raise RuntimeError( ( f"The database {self.path_db} is already populated " "with data from another simulation, use another one." ) ) self.collision.create_table() cur = self.cursor() # First iteration nions = len(atomic_numbers) nsubcollisions = np.ones(nions, dtype=np.int64) nsubcollisions0 = nsubcollisions.copy() trimdat = self._run_iter( self.target, atomic_numbers, energies, depths, ys, zs, cosxs, cosys, coszs, ) self.__append_output() ( nsubcollisions, atomic_numbers, energies, depths, ys, zs, cosxs, cosys, coszs, ) = self._filter_subcollisions(cur, nions, trimdat, nsubcollisions0, criterion) nsubcollisions_total = nsubcollisions.sum() niter = 1 while nsubcollisions_total and niter != iter_max: niter += 1 nsubcollisions0 = nsubcollisions.copy() trimdat = self._run_iter( self.target, atomic_numbers, energies, depths, ys, zs, cosxs, cosys, coszs, ) ( nsubcollisions, atomic_numbers, energies, depths, ys, zs, cosxs, cosys, coszs, ) = self._filter_subcollisions( cur, nions, trimdat, nsubcollisions0, criterion ) nsubcollisions_total = nsubcollisions.sum() # Reordering database might reduce I/O operations filtered by ion number # PKAs are not sequentally ordered cur.execute( "CREATE TABLE collision0 AS SELECT * FROM collision ORDER BY ion_numb" ) cur.execute("DROP TABLE collision") cur.execute("ALTER TABLE collision0 RENAME TO collision") cur.execute( "CREATE INDEX ionRecoilEenrgyIdx ON collision(ion_numb, recoil_energy)" ) self.commit() cur.close() if remove_offsets: self.__remove_offsets() self.optimize() if ignore_32bit_warning: warnings.filterwarnings( "default", category=UserWarning, message="32-bit application should be automated using 32-bit Python", )
def _run_iter( self, target: Target, atomic_numbers: npt.NDArray[np.int64], energies: npt.NDArray[np.float64], depths: None | npt.NDArray[np.float64] = None, ys: None | npt.NDArray[np.float64] = None, zs: None | npt.NDArray[np.float64] = None, cosxs: None | npt.NDArray[np.float64] = None, cosys: None | npt.NDArray[np.float64] = None, coszs: None | npt.NDArray[np.float64] = None, ) -> dtypes.Trimdat: """Runs SRIM for a single iteration. Parameters ---------- target : Target Target material. calculation : Calculation Calculation parameters. atomic_numbers : npt.NDArray[np.int64] Ion atomic numbers. energies : npt.NDArray[np.float64] Ion energies. depths : npt.NDArray[np.float64], optional Ion initial depths. ys : npt.NDArray[np.float64], optional Ion initial y positions. zs : npt.NDArray[np.float64], optional Ion initial z positions. cosxs : npt.NDArray[np.float64], optional Ion initial x directions. cosys : npt.NDArray[np.float64], optional Ion initial y directions. coszs : npt.NDArray[np.float64], optional Ion initial z directions. Returns ------- dtypes.Trimdat `TRIM.DAT` data. """ self.__generate_trimauto() self.generate_trimin(atomic_numbers, energies, target) trimdat = self.generate_trimdat( atomic_numbers, energies, depths=depths, ys=ys, zs=zs, cosxs=cosxs, cosys=cosys, coszs=coszs, ) # Run SRIM print(f"Running {len(atomic_numbers)} SRIM ions") try: window_thread = threading.Thread(target=self.minimize_and_handle_popup) window_thread.start() curr_dir = os.getcwd() os.chdir(self.dir_srim) subprocess.check_call([Path("./TRIM.exe")]) os.chdir(curr_dir) window_thread.join() except subprocess.CalledProcessError as e: print(traceback.format_exc()) print(f"An error occurred while running the subprocess: {e}") return trimdat def __append_output(self) -> None: """Appends SRIM output files into the database. Note ---- Although reading and saving information from the TRIM.DAT and TRIM.IN files instead of directly using the Python objects provided as input to SRIMDB.run may seem redundant, this approach is maintained to preserve the ability to read SRIM configuration files. The resulting performance loss is relatively minor. """ self.append_backscat(self.dir_srim / "SRIM Outputs/BACKSCAT.txt") self.append_e2recoil(self.dir_srim / "E2RECOIL.txt") self.append_ioniz(self.dir_srim / "IONIZ.txt") self.append_lateral(self.dir_srim / "LATERAL.txt") self.append_phonon(self.dir_srim / "PHONON.txt") self.append_range3d(self.dir_srim / "SRIM Outputs/RANGE_3D.txt") self.append_range(self.dir_srim / "RANGE.txt") self.append_sputter(self.dir_srim / "SRIM Outputs/SPUTTER.txt") self.append_transmit(self.dir_srim / "SRIM Outputs/TRANSMIT.txt") self.append_trimdat(self.dir_srim / "TRIM.DAT") self.append_vacancy(self.dir_srim / "VACANCY.txt") if self.calculation in ["full", "mono"]: self.append_novac(self.dir_srim / "NOVAC.txt") def _get_dir( self, pos0: npt.NDArray[np.float64], pos: npt.NDArray[np.float64] ) -> npt.NDArray[np.float64]: """Gets the direction from two positions. Parameters pos0 : npt.NDArray[np.float64] Initial position. pos : npt.NDArray[np.float64] Final position. """ diff = pos - pos0 with warnings.catch_warnings(): warnings.filterwarnings( "ignore", category=RuntimeWarning, message="invalid value encountered in divide", ) direction = diff / np.linalg.norm(diff) return direction def __filter_subcollisions_logic_qc(self, subcollision_data: tuple) -> dict: """Convert subcollision data into dictionary for later handling for Quick-Calculation mode. Parameters ---------- subcollision_data : tuple Subcollision data. """ _, energy, depth, y, z, se, atom_hit, recoil_energy, target_disp = ( subcollision_data ) return { "energy": energy, "depth": depth, "y": y, "z": z, "se": se, "atom_hit": atom_hit, "recoil_energy": recoil_energy, "target_disp": target_disp, } def __filter_subcollisions_logic_fc(self, subcollision_data: tuple) -> dict: """Convert subcollision data into dictionary for later handling for Full-Calculation mode. Parameters ---------- subcollision_data : tuple Subcollision data. """ ( _, energy, depth, y, z, se, atom_hit, recoil_energy, target_disp, target_vac, target_replac, target_inter, ) = subcollision_data return { "energy": energy, "depth": depth, "y": y, "z": z, "se": se, "atom_hit": atom_hit, "recoil_energy": recoil_energy, "target_disp": target_disp, "target_vac": target_vac, "target_replac": target_replac, "target_inter": target_inter, } def _filter_subcollisions( self, cur: sqlite3.Cursor, nions: int | np.int64, trimdat: dtypes.Trimdat, nsubcollisions0: npt.NDArray[np.int64], criterion: Callable, ) -> tuple: """Filters subcollisions. Parameters ---------- cur : sqlite3.Cursor Database cursor. nions : int | np.int64 Number of ions. trimdat : dtypes.Trimdat TRIMDAT data. nsubcollisions0 : npt.NDArray[np.int64] Initial number of subcollisions. criterion : Callable Criterion to repeat calculation, must return False to repeat calculation. """ self.append_subcollision(self.dir_srim / "SRIM Outputs/COLLISON.txt") atomic_numbers, recoil_energies, depths, ys, zs, cosxs, cosys, coszs = ( [], [], [], [], [], [], [], [], ) nsubcollisions = np.zeros(nions, dtype=np.int64) for nion in range(nions): nsubcollision_min = 0 if nion == 0 else nsubcollision_max nsubcollision_max = nsubcollision_min + nsubcollisions0[nion] for nsubcollision in range(nsubcollision_min, nsubcollision_max): pos0 = trimdat[nsubcollision]["pos"] cosx0, cosy0, cosz0 = trimdat[nsubcollision]["dir"] for subcollision in self.subcollision.read( condition=f"WHERE ion_numb = {nsubcollision + 1}" ): subcollision = self._filter_subcollisions_logic(subcollision) pos = np.array( [subcollision["depth"], subcollision["y"], subcollision["z"]] ) cosx, cosy, cosz = self._get_dir(pos0, pos) # There are some rare cases, specially at high energies, # when there are two PKA at the same position. I think this # is because they are really close and when saved into # COLLISON.txt, positions are rounded and they coincide. # In such cases (if statement triggered), we assume that the # second PKA has the same direction as the first one. if np.isnan(cosx) or np.isnan(cosy) or np.isnan(cosz): warnings.warn( ( "Two PKAs at the same position, assuming same direction. This is " "likely because they are really close and when saved into " "COLLISON.txt, positions are rounded and they coincide." ), RuntimeWarning, ) cosx, cosy, cosz = cosx0, cosy0, cosz0 else: cosx0, cosy0, cosz0 = cosx, cosy, cosz subcollision.update({"cosx": cosx, "cosy": cosy, "cosz": cosz}) # Check if SRIM has to be run again ok = criterion(nion=nion + 1, **subcollision) # If not, save into the database if ok: self.collision.insert(cur, nion + 1, **subcollision) # Else, save it for later else: atomic_numbers.append( materials.MATERIALS_BY_SYMBOL[ subcollision["atom_hit"] ].atomic_number ) recoil_energies.append(subcollision["recoil_energy"]) depths.append(subcollision["depth"]) ys.append(subcollision["y"]) zs.append(subcollision["z"]) cosxs.append(subcollision["cosx"]) cosys.append(subcollision["cosy"]) coszs.append(subcollision["cosz"]) nsubcollisions[nion] += 1 pos0 = pos # print(pos0, [cosx, cosy, cosz]) self.subcollision.empty() atomic_numbers = np.array(atomic_numbers, dtype=np.int64) recoil_energies, depths, ys, zs = ( np.array(recoil_energies, dtype=np.float64), np.array(depths, dtype=np.float64), np.array(ys, dtype=np.float64), np.array(zs, dtype=np.float64), ) cosxs, cosys, coszs = np.array(cosxs), np.array(cosys), np.array(coszs) return ( nsubcollisions, atomic_numbers, recoil_energies, depths, ys, zs, cosxs, cosys, coszs, ) def __remove_mean_depth_offsets( self, cur: sqlite3.Cursor, table_name: str, depth_mean: tuple[float] ) -> None: """Removes ion mean depth offsets from the given table. Parameters ---------- cur : sqlite3.Cursor Database cursor. table_name : str Table name. depth_mean : tuple[float] Mean depth. """ cur.execute(f"UPDATE {table_name} SET depth = depth - ?", depth_mean) def __remove_individual_offsets(self, cur: sqlite3.Cursor, table_name: str) -> None: """Removes individual ion offsets from the given table. Parameters ---------- cur : sqlite3.Cursor Database cursor. table_name : str Table name. """ cur.execute( f""" UPDATE {table_name} SET depth = {table_name}.depth - trimdat.depth, y = {table_name}.y - trimdat.y, z = {table_name}.z - trimdat.z FROM trimdat WHERE {table_name}.ion_numb = trimdat.ion_numb """ ) def __remove_offsets(self) -> None: """Removes ion offsets.""" cur = self.cursor() cur.execute("SELECT AVG(depth) FROM trimdat") depth_mean = cur.fetchone() self.__remove_mean_depth_offsets(cur, "e2recoil", depth_mean) self.__remove_mean_depth_offsets(cur, "ioniz", depth_mean) self.__remove_mean_depth_offsets(cur, "lateral", depth_mean) self.__remove_mean_depth_offsets(cur, "phonon", depth_mean) self.__remove_mean_depth_offsets(cur, "range", depth_mean) self.__remove_mean_depth_offsets(cur, "vacancy", depth_mean) if self.calculation in ["full", "mono"]: self.__remove_mean_depth_offsets(cur, "novac", depth_mean) self.__remove_individual_offsets(cur, "backscat") self.__remove_individual_offsets(cur, "collision") self.__remove_individual_offsets(cur, "range3d") self.__remove_individual_offsets(cur, "sputter") self.__remove_individual_offsets(cur, "transmit") self.__remove_individual_offsets(cur, "transmit") cur.close() self.commit()