"""Utility functions for I/O operations."""
import bz2
from collections import deque
from collections.abc import Iterable
from pathlib import Path
from typing import Any
import numpy as np
from irradiapy.io import (
BZIP2LAMMPSReader,
BZIP2LAMMPSWriter,
LAMMPSLogReader,
LAMMPSReader,
LAMMPSWriter,
)
from irradiapy.utils.math import apply_boundary_conditions
[docs]
def compress_file_bz2(
input_path: str, output_path: str, compresslevel: int = 9
) -> None:
"""Compress a file using bzip2.
Parameters
----------
input_path : str
Path to the input file to be compressed.
output_path : str
Path where the compressed file will be saved.
compresslevel : int, optional (default=9)
Compression level for bzip2.
"""
with (
open(input_path, "rb") as f_in,
bz2.open(output_path, "wb", compresslevel=compresslevel) as f_out,
):
for chunk in iter(lambda: f_in.read(1024 * 1024), b""):
f_out.write(chunk)
[docs]
def decompress_file_bz2(input_path: str, output_path: str) -> None:
"""Decompress a bzip2-compressed file.
Parameters
----------
input_path : str
Path to the input .bz2 file.
output_path : str
Path to the output decompressed file.
"""
with bz2.open(input_path, "rb") as f_in, open(output_path, "wb") as f_out:
for chunk in iter(lambda: f_in.read(1024 * 1024), b""):
f_out.write(chunk)
[docs]
def get_last_reader(
reader: Iterable[dict[str, Any]],
) -> dict[str, Any]:
"""Get the last snapshot from a LAMMPS dump file using a reader.
Parameters
----------
reader : Iterable[dict[str, Any]]
An instance of a LAMMPS reader.
Returns
-------
dict[str, Any]
The last snapshot from the LAMMPS file.
"""
return deque(reader, maxlen=1).pop()
[docs]
def merge_lammps_snapshots(
in_path: Path, out_path: Path, overwrite: bool = False
) -> dict[str, Any]:
"""Merge multiple snapshots in a LAMMPS file into a single snapshot.
Parameters
----------
in_path : Path
Path to the input LAMMPS file (bzip2 compressed or not).
out_path : Path
Path to the output LAMMPS file (bzip2 compressed or not).
overwrite : bool, optional (default=False)
Whether to overwrite the output file if it exists.
Returns
-------
dict[str, Any]
A dictionary containing the merged snapshot data.
"""
if not overwrite and out_path.exists():
raise FileExistsError(f"Output file {out_path} already exists.")
elif out_path.exists():
out_path.unlink()
if in_path.suffix == ".bz2":
reader = BZIP2LAMMPSReader(in_path)
else:
reader = LAMMPSReader(in_path)
if out_path.suffix == ".bz2":
writer = BZIP2LAMMPSWriter(out_path, mode="a")
else:
writer = LAMMPSWriter(out_path, mode="a")
data_atoms_list = []
for data_atoms in reader:
data_atoms_list.append(data_atoms)
reader.close()
data_atoms_merged = {}
data_atoms_merged["timestep"] = data_atoms_list[-1]["timestep"]
data_atoms_merged["time"] = data_atoms_list[-1]["time"]
data_atoms_merged["boundary"] = data_atoms_list[-1]["boundary"]
data_atoms_merged["xlo"] = data_atoms_list[-1]["xlo"]
data_atoms_merged["xhi"] = data_atoms_list[-1]["xhi"]
data_atoms_merged["ylo"] = data_atoms_list[-1]["ylo"]
data_atoms_merged["yhi"] = data_atoms_list[-1]["yhi"]
data_atoms_merged["zlo"] = data_atoms_list[-1]["zlo"]
data_atoms_merged["zhi"] = data_atoms_list[-1]["zhi"]
data_atoms_merged["atoms"] = np.concatenate(
[data_atoms["atoms"] for data_atoms in data_atoms_list]
)
writer.write(data_atoms_merged)
writer.close()
return data_atoms_merged
[docs]
def apply_boundary_conditions_to_lammps(
in_path: Path, out_path: Path, x: bool, y: bool, z: bool, overwrite: bool = False
) -> None:
"""Apply periodic boundary conditions to a LAMMPS dump file.
Parameters
----------
in_path : Path
Path to the input LAMMPS file (bzip2 compressed or not).
out_path : Path
Path to the output LAMMPS file (bzip2 compressed or not).
x : bool
Whether to apply periodic boundary conditions in the x direction.
y : bool
Whether to apply periodic boundary conditions in the y direction.
z : bool
Whether to apply periodic boundary conditions in the z direction.
overwrite : bool, optional (default=False)
Whether to overwrite the output file if it exists.
"""
if not overwrite and out_path.exists():
raise FileExistsError(f"Output file {out_path} already exists.")
elif out_path.exists():
out_path.unlink()
if in_path.suffix == ".bz2":
reader = BZIP2LAMMPSReader(in_path)
else:
reader = LAMMPSReader(in_path)
if out_path.suffix == ".bz2":
writer = BZIP2LAMMPSWriter(out_path, mode="a")
else:
writer = LAMMPSWriter(out_path, mode="a")
for data_atoms in reader:
data_atoms = apply_boundary_conditions(
data_atoms=data_atoms,
x=x,
y=y,
z=z,
)
writer.write(data_atoms)
reader.close()
writer.close()
[docs]
def lammps_to_mmonca(path_in: str, path_out: str, scale: float = 10.0) -> None:
"""Convert LAMMPS dump file to MMonCa format. Uses LAMMPSReader.
Parameters
----------
path_in : str
Path to the input LAMMPS dump file.
path_out : str
Path to the output MMonCa file.
scale : float, optional (default=10.0)
Divisor applied to coordinates. The default converts angstroms to nanometers.
Warning
-------
This converter is very basic: defects of type 0 are considered vacancies (V),
and all other defect types are considered interstitials (I). Therefore, this will
produce wrong results when multiple elements are present.
"""
total = 0
with open(path_out, "w", encoding="utf-8") as ofile:
nions = 0
for data_defects in LAMMPSReader(path_in):
defects = data_defects["atoms"]
nions += 1
natoms = len(defects)
natom = 0
ofile.write(f"{natoms}\n")
for defect in defects:
natom += 1
x = defect["x"] / scale
y = defect["y"] / scale
z = defect["z"] / scale
kind = "V" if defect["type"] == 0 else "I"
ofile.write(f"{natom} {kind} {x} {y} {z}\n")
total += natom
print(f"Total mean defects (vacs + sias): {total/nions}")