Source code for irradiapy.utils.io

"""Utility functions for I/O operations."""

import bz2
from collections import deque
from collections.abc import Iterable
from pathlib import Path
from typing import Any

import numpy as np

from irradiapy.io import (
    BZIP2LAMMPSReader,
    BZIP2LAMMPSWriter,
    LAMMPSLogReader,
    LAMMPSReader,
    LAMMPSWriter,
)
from irradiapy.utils.math import apply_boundary_conditions


[docs] def compress_file_bz2( input_path: str, output_path: str, compresslevel: int = 9 ) -> None: """Compress a file using bzip2. Parameters ---------- input_path : str Path to the input file to be compressed. output_path : str Path where the compressed file will be saved. compresslevel : int, optional (default=9) Compression level for bzip2. """ with ( open(input_path, "rb") as f_in, bz2.open(output_path, "wb", compresslevel=compresslevel) as f_out, ): for chunk in iter(lambda: f_in.read(1024 * 1024), b""): f_out.write(chunk)
[docs] def decompress_file_bz2(input_path: str, output_path: str) -> None: """Decompress a bzip2-compressed file. Parameters ---------- input_path : str Path to the input .bz2 file. output_path : str Path to the output decompressed file. """ with bz2.open(input_path, "rb") as f_in, open(output_path, "wb") as f_out: for chunk in iter(lambda: f_in.read(1024 * 1024), b""): f_out.write(chunk)
[docs] def get_last_reader( reader: Iterable[dict[str, Any]], ) -> dict[str, Any]: """Get the last snapshot from a LAMMPS dump file using a reader. Parameters ---------- reader : Iterable[dict[str, Any]] An instance of a LAMMPS reader. Returns ------- dict[str, Any] The last snapshot from the LAMMPS file. """ return deque(reader, maxlen=1).pop()
[docs] def merge_lammps_snapshots( in_path: Path, out_path: Path, overwrite: bool = False ) -> dict[str, Any]: """Merge multiple snapshots in a LAMMPS file into a single snapshot. Parameters ---------- in_path : Path Path to the input LAMMPS file (bzip2 compressed or not). out_path : Path Path to the output LAMMPS file (bzip2 compressed or not). overwrite : bool, optional (default=False) Whether to overwrite the output file if it exists. Returns ------- dict[str, Any] A dictionary containing the merged snapshot data. """ if not overwrite and out_path.exists(): raise FileExistsError(f"Output file {out_path} already exists.") elif out_path.exists(): out_path.unlink() if in_path.suffix == ".bz2": reader = BZIP2LAMMPSReader(in_path) else: reader = LAMMPSReader(in_path) if out_path.suffix == ".bz2": writer = BZIP2LAMMPSWriter(out_path, mode="a") else: writer = LAMMPSWriter(out_path, mode="a") data_atoms_list = [] for data_atoms in reader: data_atoms_list.append(data_atoms) reader.close() data_atoms_merged = {} data_atoms_merged["timestep"] = data_atoms_list[-1]["timestep"] data_atoms_merged["time"] = data_atoms_list[-1]["time"] data_atoms_merged["boundary"] = data_atoms_list[-1]["boundary"] data_atoms_merged["xlo"] = data_atoms_list[-1]["xlo"] data_atoms_merged["xhi"] = data_atoms_list[-1]["xhi"] data_atoms_merged["ylo"] = data_atoms_list[-1]["ylo"] data_atoms_merged["yhi"] = data_atoms_list[-1]["yhi"] data_atoms_merged["zlo"] = data_atoms_list[-1]["zlo"] data_atoms_merged["zhi"] = data_atoms_list[-1]["zhi"] data_atoms_merged["atoms"] = np.concatenate( [data_atoms["atoms"] for data_atoms in data_atoms_list] ) writer.write(data_atoms_merged) writer.close() return data_atoms_merged
[docs] def apply_boundary_conditions_to_lammps( in_path: Path, out_path: Path, x: bool, y: bool, z: bool, overwrite: bool = False ) -> None: """Apply periodic boundary conditions to a LAMMPS dump file. Parameters ---------- in_path : Path Path to the input LAMMPS file (bzip2 compressed or not). out_path : Path Path to the output LAMMPS file (bzip2 compressed or not). x : bool Whether to apply periodic boundary conditions in the x direction. y : bool Whether to apply periodic boundary conditions in the y direction. z : bool Whether to apply periodic boundary conditions in the z direction. overwrite : bool, optional (default=False) Whether to overwrite the output file if it exists. """ if not overwrite and out_path.exists(): raise FileExistsError(f"Output file {out_path} already exists.") elif out_path.exists(): out_path.unlink() if in_path.suffix == ".bz2": reader = BZIP2LAMMPSReader(in_path) else: reader = LAMMPSReader(in_path) if out_path.suffix == ".bz2": writer = BZIP2LAMMPSWriter(out_path, mode="a") else: writer = LAMMPSWriter(out_path, mode="a") for data_atoms in reader: data_atoms = apply_boundary_conditions( data_atoms=data_atoms, x=x, y=y, z=z, ) writer.write(data_atoms) reader.close() writer.close()
[docs] def lammps_to_mmonca(path_in: str, path_out: str, scale: float = 10.0) -> None: """Convert LAMMPS dump file to MMonCa format. Uses LAMMPSReader. Parameters ---------- path_in : str Path to the input LAMMPS dump file. path_out : str Path to the output MMonCa file. scale : float, optional (default=10.0) Divisor applied to coordinates. The default converts angstroms to nanometers. Warning ------- This converter is very basic: defects of type 0 are considered vacancies (V), and all other defect types are considered interstitials (I). Therefore, this will produce wrong results when multiple elements are present. """ total = 0 with open(path_out, "w", encoding="utf-8") as ofile: nions = 0 for data_defects in LAMMPSReader(path_in): defects = data_defects["atoms"] nions += 1 natoms = len(defects) natom = 0 ofile.write(f"{natoms}\n") for defect in defects: natom += 1 x = defect["x"] / scale y = defect["y"] / scale z = defect["z"] / scale kind = "V" if defect["type"] == 0 else "I" ofile.write(f"{natom} {kind} {x} {y} {z}\n") total += natom print(f"Total mean defects (vacs + sias): {total/nions}")