"""Class for cascade simulations."""
# pylint: disable=line-too-long
import copy
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import numpy as np
import numpy.typing as npt
import irradiapy.lammps.commands as cmds
from irradiapy import utils
from irradiapy.io.lammpslogreader import LAMMPSLogReader
from irradiapy.lammps.cascadebase import CascadeBase
from irradiapy.lammps.utils import load_lammps_class, get_properties_from_group
[docs]
@dataclass(kw_only=True)
class Cascade(CascadeBase):
"""Class for cascade simulations.
Data files will automatically be generated.
You might use multiple dump files, an index will be added to the file name.
Rerunning requires the dt, time and timestep from the thermo output of the previous run.
Attributes
----------
ion_energy : float
The energy of the PKA ion.
pka_target : npt.NDArray[np.float64]
The target position of the PKA. PKAs will be directed towards this position.
cmds_ions_grouping : list[cmds.Command]
The list of commands for ion grouping. PKAs will be randomly selected from this group.
"""
ion_energy: float
pka_target: npt.NDArray[np.float64]
cmds_ions_grouping: list[cmds.Command]
[docs]
def run(self) -> None:
"""Run the bulk cascade simulations."""
lammps_class = load_lammps_class()
self._run(lammps_class)
@utils.mpi.mpi_safe_method
def _run(self, lammps_class: type[Any]) -> None:
# Initialise directory
self.dir_parent.mkdir(parents=True, exist_ok=True)
# Thermalisation
if self.cmds_therma:
path_data_therma = self.dir_parent / "simulation.data"
path_log_therma = self.dir_parent / "simulation.log"
if self.rank == 0 and path_data_therma.exists():
raise FileExistsError("Thermalisation already done.")
self.comm.Barrier()
try:
lmp = lammps_class(comm=self.comm)
self.__run_thermalisation(lmp)
except Exception as e:
raise e
finally:
lmp.close()
utils.mpi.mv_file(self.path_log_cwd, path_log_therma, comm=self.comm)
if self.cmds_cascade:
# Select ions for cascades
try:
lmp = lammps_class(comm=self.comm)
(
ions_ids,
ions_atomic_numbers,
ions_positions,
ions_speeds,
ions_velocities,
ions_phis,
ions_thetas,
) = self.__select_ions_for_cascades(lmp)
except Exception as e:
raise e
finally:
lmp.close()
utils.mpi.rm_file(self.path_log_cwd, comm=self.comm)
# Run cascades
for idx in range(self.nsims):
dir_cascade = self.dir_parent / f"simulation_{idx+1}"
path_log_cascade = dir_cascade / f"simulation_{idx+1}.log"
if idx + 1 in self.skip:
continue
if self.rank == 0 and dir_cascade.exists():
raise FileExistsError(f"Cascade {idx+1} already exists.")
self.comm.Barrier()
try:
lmp = lammps_class(comm=self.comm)
self.__run_cascade(
lmp,
idx + 1,
ions_ids[idx],
ions_atomic_numbers[idx],
ions_positions[idx],
ions_speeds[idx],
ions_velocities[idx],
ions_phis[idx],
ions_thetas[idx],
)
except Exception as e:
raise e
finally:
lmp.close()
utils.mpi.mv_file(
self.path_log_cwd, path_log_cascade, comm=self.comm
)
if self.cmds_rerun:
for idx in range(self.nsims):
dir_cascade = self.dir_parent / f"simulation_{idx+1}"
path_log_cascade = dir_cascade / f"simulation_{idx+1}.log"
if idx + 1 in self.skip:
continue
if self.rank == 0 and not dir_cascade.exists():
raise FileExistsError(
f"Cascade {idx+1} does not exist, impossible to rerun."
)
self.comm.Barrier()
path_data_cascade = dir_cascade / f"simulation_{idx+1}.data"
timestep0, t0 = None, None
if self.rank == 0:
# extract timestep from data file
data_file = open(path_data_cascade, "r", encoding="utf-8")
line = data_file.readline()
data_file.close()
timestep0 = int(line.split("timestep = ")[1].split(",")[0])
# extract timestep, time and dt from log file
log_path = dir_cascade / f"simulation_{idx+1}.log"
last_log = utils.io.get_last_reader(LAMMPSLogReader(log_path))[
"thermo"
]
log_timestep = int(last_log["Step"][-1])
log_time = float(last_log["Time"][-1])
log_dt = float(last_log["Dt"][-1])
t0 = (timestep0 - log_timestep) * log_dt + log_time
timestep0, t0 = self._broadcast_variables(timestep0, t0)
try:
lmp = lammps_class(comm=self.comm)
self.__rerun(
lmp,
idx + 1,
timestep0,
t0,
)
except Exception as e:
raise e
finally:
lmp.close()
utils.mpi.ap_rm_file(
self.path_log_cwd, path_log_cascade, comm=self.comm
)
if self.finalize:
lmp.finalize()
def __run_thermalisation(self, lmp: Any) -> None:
"""Run the thermalisation.
Parameters
----------
lmp : lammps
LAMMPS instance.
"""
cmds_therma = copy.deepcopy(self.cmds_therma)
for cmd in cmds_therma:
if isinstance(cmd, cmds.Dump):
cmd.file = self.dir_parent / f"simulation{Path(cmd.file).suffix}"
if isinstance(cmd, cmds.Fix):
if cmd.style == "eph":
self.uttm_c = cmd.args[4]
self.uttm_k = cmd.args[5]
self.uttm_temperature = cmd.args[6]
cmd.args[10] = self.dir_parent / "egrid0"
cmd.args[12] = self.dir_parent / "egrid/egrid"
(self.dir_parent / "egrid").mkdir(parents=True, exist_ok=True)
self._generate_egrid()
self.comm.Barrier()
cmd_write = cmds.WriteData(file=self.dir_parent / "simulation.data")
self.exec_cmds(lmp, self.cmds_preamble + cmds_therma + [cmd_write])
def __select_ions_for_cascades(self, lmp: Any) -> tuple[
np.ndarray,
np.ndarray,
np.ndarray,
np.ndarray,
np.ndarray,
np.ndarray,
np.ndarray,
]:
"""Select ions for cascades, generate random velocities, and broadcast their properties."""
cmd_read = cmds.ReadData(file=self.dir_parent / "simulation.data")
self.exec_cmds(lmp, self.cmds_preamble + [cmd_read] + self.cmds_ions_grouping)
# Collect ions properties (full group)
ions_group = "all"
for cmd in self.cmds_ions_grouping:
if isinstance(cmd, cmds.Group):
ions_group = cmd.id
ions_properties = get_properties_from_group(
lmp, ions_group, ["id", "type", "x"]
) # for each process this variable is different
ions_ids = ions_atomic_numbers = ions_positions = ions_speeds = None
ions_velocities = ions_phis = ions_thetas = None
# This is a list of dictionaries, one for each process
ions_properties_gathered = self.comm.gather(ions_properties, root=0)
if self.rank == 0:
# Gather ions properties from all processes into a single dictionary
ions_properties = {
key: np.concatenate([g[key] for g in ions_properties_gathered])
for key in ions_properties
}
# Reorder by id in ascending order for reproducibility
order = np.argsort(ions_properties["id"])
ions_properties = {
"id": ions_properties["id"][order],
"type": ions_properties["type"][order],
"x": ions_properties["x"][order],
}
ions_properties["atomic_number"] = self.types_to_atomic_numbers(
ions_properties["type"]
)
# Select random ions and generate random velocities and angles
idxs = self._rng.integers(0, ions_properties["id"].size, size=self.nsims)
ions_ids = ions_properties["id"][idxs]
ions_atomic_numbers = ions_properties["atomic_number"][idxs]
ions_positions = ions_properties["x"][idxs]
ions_speeds = self._calculate_atoms_speeds(
np.full(self.nsims, self.ion_energy), ions_atomic_numbers
)
# The ion direction is towards the center of the box
dirxs = self.pka_target[0] - ions_positions[:, 0]
dirys = self.pka_target[1] - ions_positions[:, 1]
dirzs = self.pka_target[2] - ions_positions[:, 2]
directions = np.stack([dirxs, dirys, dirzs], axis=1)
# Normalize directions
directions /= np.linalg.norm(directions, axis=1)[:, np.newaxis]
# Get actual velocities
ions_velocities = ions_speeds[:, np.newaxis] * directions
# Get polar and azimuthal angles
ions_thetas = np.arccos(directions[:, 2]) # polar angle
ions_phis = np.arctan2(
directions[:, 1], directions[:, 0]
) # azimuthal angle
self.comm.Barrier()
(
ions_ids,
ions_atomic_numbers,
ions_positions,
ions_speeds,
ions_velocities,
ions_phis,
ions_thetas,
) = self._broadcast_variables(
ions_ids,
ions_atomic_numbers,
ions_positions,
ions_speeds,
ions_velocities,
ions_phis,
ions_thetas,
)
return (
ions_ids,
ions_atomic_numbers,
ions_positions,
ions_speeds,
ions_velocities,
ions_phis,
ions_thetas,
)
def __run_cascade(
self,
lmp: Any,
idx: int,
ion_id: np.integer,
ion_atomic_number: np.integer,
ion_position: np.ndarray,
ion_speed: float,
ion_velocity: np.ndarray,
ion_phi: float,
ion_theta: float,
) -> None:
"""Run the bulk irradiation.
Parameters
----------
lmp : lammps
LAMMPS instance.
idx : int
Index of the simulation.
ion_id : int
Ion ID.
ion_atomic_number : int
Ion atomic number.
ion_position : np.ndarray
Ion position.
ion_speed : float
Ion speed.
ion_velocity : np.ndarray
Ion velocity.
ion_phi : float
Ion azimuthal angle.
ion_theta : float
Ion polar angle.
"""
dir_cascade = self.dir_parent / f"simulation_{idx}"
dir_cascade.mkdir(parents=True, exist_ok=True)
cmds_cascade = copy.deepcopy(self.cmds_cascade)
# Handle log
# Reading and writing data/restart files
cmd_read = cmds.ReadData(file=self.dir_parent / "simulation.data")
cmd_write = cmds.WriteData(file=dir_cascade / f"simulation_{idx}.data")
# Handle dump and fix commands
for cmd in cmds_cascade:
if isinstance(cmd, cmds.Dump):
path = Path(cmd.file)
cmd.file = dir_cascade / f"{path.stem}_{idx}{path.suffix}"
if isinstance(cmd, cmds.Fix) and cmd.id == "rdf":
cmd.kw_vals["file"] = dir_cascade / f"rdf_{idx}.rdf"
if isinstance(cmd, cmds.Fix) and cmd.style == "eph":
path_egrid_cascade = dir_cascade / "egrid"
path_egrid_cascade.mkdir(parents=True, exist_ok=True)
cmd.args[10] = self.dir_parent / "egrid0.restart"
cmd.args[12] = path_egrid_cascade / "egrid"
# Print ion properties
cmds_print = [
cmds.Print(string="'Ion boost:'"),
cmds.Print(string=f"' ID: {ion_id}'"),
cmds.Print(string=f"' Element: {ion_atomic_number}'"),
cmds.Print(
string=f"' Position: ({ion_position[0]}, {ion_position[1]}, {ion_position[2]}) angstrom'"
),
cmds.Print(string=f"' Energy: {self.ion_energy} eV'"),
cmds.Print(string=f"' Speed: {ion_speed} angstrom/ps'"),
cmds.Print(
string=f"' Velocity: ({ion_velocity[0]}, {ion_velocity[1]}, {ion_velocity[2]}) angstrom/ps'"
),
cmds.Print(string=f"' Polar angle (theta): {ion_theta} rad'"),
cmds.Print(string=f"' Azimuthal angle (phi): {ion_phi} rad'"),
]
# Replace ion boosting commands
for i, cmd in enumerate(cmds_cascade):
if isinstance(cmd, cmds.MBoostIon):
cmds_cascade[i] = cmds.Group(
id="ion",
style="id",
args=[ion_id],
)
cmd_velocity = cmds.Velocity(
group_id="ion",
style="set",
args=[ion_velocity[0], ion_velocity[1], ion_velocity[2]],
kw_vals={"units": "box"},
)
cmds_cascade.insert(i + 1, cmd_velocity)
cmds_cascade = [
*self.cmds_preamble,
cmd_read,
cmds.ResetTimestep(n=0),
*cmds_print,
*cmds_cascade,
cmd_write,
]
self.comm.Barrier()
self.exec_cmds(lmp, cmds_cascade)
if self._uttm:
utils.mpi.mv_file(
self.dir_parent / "egrid0.restart.restart",
dir_cascade / "egrid0.restart",
comm=self.comm,
)
def __rerun(
self,
lmp: Any,
idx: int,
timestep0: int,
t0: float,
) -> None:
"""Run the bulk irradiation.
Parameters
----------
lmp : lammps
LAMMPS instance.
idx : int
Index of the simulation.
timestep0 : int
Initial timestep of the simulation.
t0 : float
Initial time of the simulation.
"""
dir_cascade = self.dir_parent / f"simulation_{idx}"
cmds_rerun = copy.deepcopy(self.cmds_rerun)
# Reading and writing data/restart files
path_data_cascade = dir_cascade / f"simulation_{idx}.data"
cmd_read = cmds.ReadData(file=path_data_cascade)
cmd_write = cmds.WriteData(file=path_data_cascade)
# Handle dump and fix commands
for cmd in cmds_rerun:
if isinstance(cmd, cmds.Dump):
path = Path(cmd.file)
cmd.file = dir_cascade / f"{path.stem}_{idx}{path.suffix}"
if isinstance(cmd, cmds.Fix) and cmd.style == "eph":
cmd.args[10] = dir_cascade / "egrid0.restart"
cmd.args[12] = dir_cascade / "egrid/egrid"
cmds_rerun = [
*self.cmds_preamble,
cmd_read,
cmds.ResetTimestep(n=timestep0, kw_vals={"time": t0}),
*cmds_rerun,
cmd_write,
]
self.comm.Barrier()
self.exec_cmds(lmp, cmds_rerun)
if self._uttm:
utils.mpi.mv_file(
dir_cascade / "egrid0.restart.restart",
dir_cascade / "egrid0.restart",
comm=self.comm,
)