Source code for mybigdft.jobschnet

r"""
The :class:`Jobschnet` class is the base class defining a SchnetPack calculation.
"""

from __future__ import print_function, absolute_import
import os
import torch
import numpy as np
from copy import deepcopy
from mybigdft import Posinp
from schnetpack.utils.script_utils.predict import predict


[docs]class Jobschnet(object): r""" This class defines a SchnetPack calculation similarly to the Job class for BigDFT. """ def __init__(self, name="", posinp=None): r""" Parameters ---------- name : str Name of the job. Will be used to name the created files. posinp : Posinp Base atomic positions for the job """ # Verify there are initial positions if posinp is None: raise ValueError("A JobSchnet instance has no initial positions.") elif not isinstance(posinp, list): posinp = [posinp] for pos in posinp: if not isinstance(pos, Posinp): raise TypeError( "Atomic Positions should be given in a list of mybigdft.Posinp instances." ) # Set the base attributes self.posinp = posinp self.number_of_structures = len(self.posinp) self.name = name self.logfile = Logfileschnet(self.posinp) self._set_filenames() @property def name(self): r""" Returns ------- str Base name of the calculation used to set the names of files and directories. """ return self._name @name.setter def name(self, name): self._name = str(name) @property def posinp(self): r""" Returns ------- Posinp Initial positions of the calculation """ return self._posinp @posinp.setter def posinp(self, posinp): self._posinp = posinp @property def number_of_structures(self): r""" Returns ------- int Number of different structures when the job is declared """ return self._number_of_structures @number_of_structures.setter def number_of_structures(self, number_of_structures): self._number_of_structures = int(number_of_structures) @property def logfile(self): r""" Returns ------- Logfileschnet Object empty or containing the results of the calculation. """ return self._logfile @logfile.setter def logfile(self, logfile): self._logfile = logfile @property def outfile_name(self): r""" Returns ------- str Name of the output file """ return self._outfile_name @outfile_name.setter def outfile_name(self, outfile_name): self._outfile_name = outfile_name def _set_filenames(self): if self.name != "": self.outfile_name = self.name + ".out" else: self.outfile_name = "outfile.out"
[docs] def run( self, model_dir=None, forces=False, device="cpu", write_to_disk=False, batch_size=128, overwrite=False, ): r""" Parameters ---------- model_dir: str Absolute path to the SchnetPack model to use in calculation forces : int or bool Order of the force calculations (0, 1 or 2) If 0 (or `False`), forces are not evaluated. If 1, forces are evaluated on first-order (6N calculations) If 2 (or `True`), forces are evaluated on second-order (12N calculations) device : str Either 'cpu' or 'cuda' to run on cpu or gpu write_to_disk : bool If `True`, an outfile will be written after the calculation. batch_size : int Size of the mini-batches used in predictions overwrite : bool If `True`, all .db files are removed from the directory before the calculation is done. """ # Verify model_dir if model_dir is None: raise ValueError("This job needs a path to a stored model.") if not isinstance(model_dir, str): raise TypeError("The path to the stored model must be a string.") try: model_dir = os.environ["MODELDIR"] + model_dir except KeyError: pass # Forces verification and preparation if isinstance(forces, bool): forces = 2 if forces else 0 if forces in [0, 1, 2]: if forces == 1: self._create_additional_structures(order=1) if forces == 2: self._create_additional_structures(order=2) else: raise ValueError( "Parameter `forces` should be a bool or a int between 0 and 2." ) # Verify device device = str(device) if device.startswith("cuda"): if not torch.cuda.is_available(): raise Warning("CUDA was asked for, but is not available.") # Verify batch_size if not isinstance(batch_size, int): try: batch_size == int(batch_size) except: raise TypeError("The mini-batches sizes are not defined correctly.") # Run the actual calculation raw_predictions = predict( modelpath=model_dir, posinp=self.posinp, name=self.name, device=device, disk_out=False, batch_size=batch_size, overwrite=overwrite, return_values=True, ) # Determine available properties if "energy_U0" in list(raw_predictions.keys()): raw_predictions["energy"] = raw_predictions.pop("energy_U0") raw_predictions["energy"] = raw_predictions["energy"].astype(np.float64) available_properties = list(raw_predictions.keys()) available_properties.remove("idx") # Format the predictions predictions = {} if not forces: for prop in available_properties: predictions[prop] = list(raw_predictions[prop]) else: # Calculate the forces pred_idx = 0 predictions["energy"], predictions["forces"] = [], [] for struct_idx in range(self.number_of_structures): predictions["energy"].append(raw_predictions["energy"][pred_idx][0]) pred_idx += 1 if forces == 1: predictions["forces"].append( self._calculate_forces( raw_predictions["energy"][ pred_idx : pred_idx + 6 * len(self._init_posinp[struct_idx]) ], order=forces, ) ) pred_idx += 6 * len(self._init_posinp[struct_idx]) elif forces == 2: predictions["forces"].append( self._calculate_forces( raw_predictions["energy"][ pred_idx : pred_idx + 12 * len(self._init_posinp[struct_idx]) ], order=forces, ) ) pred_idx += 12 * len(self._init_posinp[struct_idx]) self.logfile._update_results(predictions) # Reset self._posinp for more calculations try: self.posinp = deepcopy(self._init_posinp) except: pass if write_to_disk: # To improve? with open(self.outfile_name, "w") as out: for idx, struct in enumerate(self.posinp): out.write("Structure {}\n".format(idx)) out.write("-------------------\n") out.write("Energy : {}\n".format(self.logfile.energy[idx])) out.write("Forces : \n") np.savetxt(out, self.logfile.forces[idx]) out.write("\n")
def _create_additional_structures(self, order, deriv_length=0.015): r""" Creates the additional structures needed to do a numeric derivation of the energy to calculate the forces. """ if order not in [1, 2]: raise ValueError("Order of the forces calculation should be 1 or 2.") self._init_posinp = deepcopy(self.posinp) self._deriv_length = deriv_length all_structs = [] # First order forces calculation if order == 1: for str_idx, struct in enumerate(self.posinp): all_structs.append(struct) for factor in [1, -1]: for dim in [ np.array([1, 0, 0]), np.array([0, 1, 0]), np.array([0, 0, 1]), ]: all_structs.extend( [ struct.translate_atom( atom_idx, deriv_length * factor * dim ) for atom_idx in range(len(struct)) ] ) self.posinp = all_structs # Second order forces calculations elif order == 2: for str_idx, struct in enumerate(self.posinp): all_structs.append(struct) for factor in [2, 1, -1, -2]: for dim in [ np.array([1, 0, 0]), np.array([0, 1, 0]), np.array([0, 0, 1]), ]: all_structs.extend( [ struct.translate_atom( atom_idx, deriv_length * factor * dim ) for atom_idx in range(len(struct)) ] ) self.posinp = all_structs def _calculate_forces(self, predictions, order): r""" Method to calculate forces from the displaced atomic positions Parameters ---------- predictions : 1D numpy array (size 6*n_at) Contains the predictions obtained from the neural network Returns ------- forces : 2D numpy array (size (n_at, 3)) Forces for each structure """ if order == 1: nat = int(len(predictions) / 6) forces = np.zeros((nat, 3)) for i in range(3): ener1, ener2 = ( predictions[np.arange(i * nat, (i + 1) * nat, 1)], predictions[np.arange((i + 3) * nat, (i + 4) * nat, 1)], ) forces[:, i] = -(ener1 - ener2).reshape(nat) / (2 * self._deriv_length) elif order == 2: nat = int(len(predictions) / 12) forces = np.zeros((nat, 3)) for i in range(3): ener1, ener2, ener3, ener4 = ( predictions[np.arange(i * nat, (i + 1) * nat, 1)], predictions[np.arange((i + 3) * nat, (i + 4) * nat, 1)], predictions[np.arange((i + 6) * nat, (i + 7) * nat, 1)], predictions[np.arange((i + 9) * nat, (i + 10) * nat, 1)], ) forces[:, i] = -( (-ener1 + 8 * (ener2 - ener3) + ener4).reshape(nat) / (12 * self._deriv_length) ) else: raise ValueError("Order of the forces calculation should be 1 or 2.") return forces
[docs]class Logfileschnet(object): r""" Container class to emulate the Logfile object used in BigDFT calculations """ def __init__(self, posinp): self.posinp = posinp self.n_at = [] self.atom_types = [] self.boundary_conditions = [] self.cell = [] for struct in self.posinp: self.n_at.append(len(struct)) self.atom_types.append(set([atom.type for atom in struct])) self.boundary_conditions.append(struct.boundary_conditions) self.cell.append(struct.cell) self.energy = None self.forces = None self.dipole = None @property def posinp(self): r""" Returns ------- list of Posinps List containing the base Posinp objects for the predictions. """ return self._posinp @posinp.setter def posinp(self, posinp): self._posinp = posinp @property def n_at(self): r""" Returns ------- list of ints List containing the number of atoms of each structure. """ return self._n_at @n_at.setter def n_at(self, n_at): self._n_at = n_at @property def atom_types(self): r""" Returns ------- list of sets List containing sets of the elements present in each structure. """ return self._atom_types @atom_types.setter def atom_types(self, atom_types): self._atom_types = atom_types @property def boundary_conditions(self): r""" Returns ------- list of strings List containing the boundary conditions, either `free`, `surface` or `periodic`, of each structure. """ return self._boundary_conditions @boundary_conditions.setter def boundary_conditions(self, boundary_conditions): self._boundary_conditions = boundary_conditions @property def cell(self): r""" Returns ------- list of lists of floats List containing cell dimensions of each structure, None for free boundary conditions. """ return self._cell @cell.setter def cell(self, cell): self._cell = cell @property def energy(self): r""" Returns ------- list of floats or None List containing the energy value for each structure. If None, the energies have not been calculated yet. """ return self._energy @energy.setter def energy(self, energy): self._energy = energy @property def forces(self): r""" Returns ------- list of arrays of floats or None List containing the forces values for each structure. If None, the forces have not been calculated yet. """ return self._forces @forces.setter def forces(self, forces): self._forces = forces @property def dipole(self): r""" Returns ------- list of arrays of floats or None List containing the dipole values for each structure. If None, the dipoles have not been calculated yet. """ return self._dipole @dipole.setter def dipole(self, dipole): self._dipole = dipole def _update_results(self, predictions): r""" Method to store Jobschnet results in the Logfileschnet container. Useful for the workflows. Parameters ---------- predictions : dict Dictionary containing the predictions returned by the Schnetpack calculations """ available_properties = list(predictions.keys()) if "energy" in available_properties: self.energy = predictions["energy"] if "forces" in available_properties: self.forces = predictions["forces"] if "dipole" in available_properties: self.dipole = predictions["dipole"]