"""
The :class:`Job` class is the base class defining a BigDFT calculation.
"""
from __future__ import print_function, absolute_import
import os
import shutil
import subprocess
from threading import Timer
from copy import deepcopy
from mybigdft.iofiles import InputParams, Logfile
from mybigdft.iofiles.logfiles import GeoptLogfile
from mybigdft.iofiles.inputparams import clean
from .globals import BIGDFT_PATH, BIGDFT_TOOL_PATH, DEFAULT_PARAMETERS
[docs]class Job(object):
r"""
This class is meant to define a BigDFT calculation. :meth:`run` is
its main method and it must be used in a context manager to ensure
that the calculation is run the desired directory.
"""
def __init__(
self,
name="",
inputparams=None,
posinp=None,
run_dir=None,
ref_data_dir=None,
skip=False,
pseudos=False,
):
r"""
You may pass input parameters and/or initial geometry (posinp).
Make sure to at least provide initial positions, either via the
posinp or the input parameters.
You may give a `name` for the calculation, used to name the
input and output files written on disk (default naming
conventions are used if not). You can also specify the directory
where to run the calculation with `run_dir`.
A reference calculation may be given in order to copy its data
directory to the present calculation (main use: restart from the
wavefunctions of the reference calculation).
Parameters
----------
inputparams : InputParams or None
BigDFT input parameters.
posinp : Posinp or None
BigDFT initial geometry file.
name : str
Prefix of the BigDFT calculation (used to define the input
and output file names).
run_dir : str or None
Folder where to run the calculation (default to current
directory).
ref_data_dir : str
Path to the data directory of a reference BigDFT
calculation.
skip : bool
If `True`, the calculation will be skipped. (Note: Might not
be useful now, since we check for the existence of the
logfile before running, which might be the actual check of
the skip option of BigDFT.)
pseudos : bool
If `True`, the pseudopotential files stored in $PSEUDODIR
will be used to complete the job.
Raises
------
ValueError
If no initial positions are given in the posinp or the input
parameters.
A Job instance can be initialized by using a posinp only:
>>> from mybigdft import Posinp, Atom
>>> pos = Posinp(
... [Atom('N', [2.9763078243490115e-23, 6.872205952043537e-23,
... 0.01071619987487793]),
... Atom('N', [-1.1043449194501671e-23, -4.873421744830746e-23,
... 1.104273796081543])], "angstroem", "free"
... )
>>> job = Job(posinp=pos, run_dir="tests")
Default values are therefore used for the input parameters:
>>> job.inputparams
{}
Input and output file names are defined from the `name` passed
as argument. Here, no name is passed, so that default names are
used:
>>> job.input_name
'input.yaml'
>>> job.posinp_name
'posinp.xyz'
>>> job.logfile_name
'log.yaml'
The directories are defined from the `run_dir` argument:
>>> import os
>>> os.getcwd() == job.init_dir
True
>>> os.path.basename(job.init_dir) != 'tests'
True
>>> os.path.basename(job.run_dir)
'tests'
There is no logfile associated to the job yet as it was not run:
>>> job.logfile == {}
True
To run the job, do it from a context manager:
>>> with job as j:
... j.run()
...
/.../tests
Logfile log.yaml already exists!
<BLANKLINE>
A logfile being found, it is read and not computed again:
>>> job.logfile == {}
False
"""
# Check the input parameters of the calculation
if inputparams is None:
inputparams = InputParams()
if posinp is None:
posinp = inputparams.posinp
if inputparams.posinp is None and posinp is None:
raise ValueError("Please provide initial positions.")
elif inputparams.posinp is not None and posinp != inputparams.posinp:
raise ValueError("inputparams and posinp do not define the same posinp.")
# Set the base attributes
inputparams.params = clean(inputparams.params)
self.inputparams = inputparams
self.posinp = posinp
self.logfile = Logfile()
self.ref_data_dir = ref_data_dir
self.name = name
self.skip = skip
self.is_completed = False
self.pseudos = pseudos
if self.pseudos:
try:
self.inputparams["dft"]["ixc"] = -101130
except KeyError:
self.inputparams["dft"] = {"ixc": -101130}
# Derive the rest of the attributes from the other arguments
self._set_directory_attributes(run_dir)
self._set_filename_attributes()
self._set_cmd_attributes()
@property
def name(self):
r"""
Returns
-------
str
Base name of the calculation used to set the names of
files and directories as well as the commands.
"""
return self._name
@name.setter
def name(self, name):
self._name = str(name)
@property
def inputparams(self):
r"""
Returns
-------
InputParams
Input parameters of the calculation.
"""
return self._inputparams
@inputparams.setter
def inputparams(self, inputparams):
self._inputparams = deepcopy(inputparams)
@property
def posinp(self):
r"""
Returns
-------
Posinp or None
Initial positions of the calculation.
"""
return self._posinp
@posinp.setter
def posinp(self, posinp):
self._posinp = posinp
@property
def logfile(self):
r"""
Returns
-------
Logfile or None
Logfile of the calculation (output of the bigdft or
bigdft-tool executable).
"""
return self._logfile
@logfile.setter
def logfile(self, logfile):
self._logfile = logfile
@property
def ref_data_dir(self):
r"""
Returns
-------
str
Reference directory where some relevant data (such as
wavefunctions) is stored.
"""
return self._ref_data_dir
@ref_data_dir.setter
def ref_data_dir(self, ref_data_dir):
self._ref_data_dir = ref_data_dir
@property
def pseudos(self):
r"""
Returns
-------
bool
if `True`, the calculation uses the pseudopotential files
in $PSEUDODIR (environment variable).
"""
return self._pseudos
@pseudos.setter
def pseudos(self, pseudos):
self._pseudos = pseudos
@property
def skip(self):
r"""
Returns
-------
bool
If `True`, the calculation will be skipped. (Note: Might not
be useful now, since we check for the existence of the
logfile before running, which might be the actual check of
the skip option of BigDFT.)
"""
return self._skip
@skip.setter
def skip(self, skip):
self._skip = bool(skip)
@property
def init_dir(self):
r"""
Returns
-------
str
Absolute path to the initial directory of the calculation
(can differ from :meth:`~mybigdft.job.Job.run_dir`).
"""
return self._init_dir
@init_dir.setter
def init_dir(self, init_dir):
self._init_dir = init_dir
@property
def run_dir(self):
r"""
Returns
-------
str
Absolute path to the directory where the calculation is run.
"""
return self._run_dir
@run_dir.setter
def run_dir(self, run_dir):
self._run_dir = run_dir
@property
def data_dir(self):
r"""
Returns
-------
str
Absolute path to the data directory of the calculation.
"""
return self._data_dir
@data_dir.setter
def data_dir(self, data_dir):
self._data_dir = data_dir
@property
def bigdft_tool_cmd(self):
r"""
Returns
-------
list
Base command to run the bigdft-tool executable.
"""
return self._bigdft_tool_cmd
@bigdft_tool_cmd.setter
def bigdft_tool_cmd(self, bigdft_tool_cmd):
self._bigdft_tool_cmd = bigdft_tool_cmd
@property
def bigdft_cmd(self):
r"""
Returns
-------
list
Base command to run the bigdft executable.
"""
return self._bigdft_cmd
@bigdft_cmd.setter
def bigdft_cmd(self, bigdft_cmd):
self._bigdft_cmd = bigdft_cmd
@property
def input_name(self):
r"""
Returns
-------
str
Name of the input parameters file.
"""
return self._input_name
@input_name.setter
def input_name(self, input_name):
self._input_name = input_name
@property
def posinp_name(self):
r"""
Returns
-------
str
Name of the input position file.
"""
return self._posinp_name
@posinp_name.setter
def posinp_name(self, posinp_name):
self._posinp_name = posinp_name
@property
def logfile_name(self):
r"""
Returns
-------
str
Name of the logfile.
"""
return self._logfile_name
@logfile_name.setter
def logfile_name(self, logfile_name):
self._logfile_name = logfile_name
@property
def is_completed(self):
r"""
Returns
-------
bool
`True` if the job has already run successfully.
"""
return self._is_completed
@is_completed.setter
def is_completed(self, is_completed):
self._is_completed = is_completed
def _set_directory_attributes(self, run_dir):
r"""
Set the attributes regarding the directories used to run the
calculation and to store data.
Parameters
----------
run_dir : str or None
Folder where to run the calculation.
"""
self._set_init_and_run_directories(run_dir)
self._set_data_directory()
def _set_init_and_run_directories(self, run_dir):
r"""
Set the attributes regarding the directories used to run the
calculation.
Parameters
----------
run_dir : str or None
Folder where to run the calculation.
"""
# Set the initial directory
self.init_dir = os.getcwd()
# Set the directory where the calculation will be run
if run_dir is None:
self.run_dir = self.init_dir
else:
# A run directory was given, find the common prefix with the
# current working directory
basename = os.path.commonprefix([self.init_dir, run_dir])
if basename == "":
# If there is no common prefix, then the run directory
# is already well defined, and the absolute directory is
# the concatenation of the current working directory and
# the run directory
self.run_dir = os.path.join(self.init_dir, run_dir)
else:
# Else, find the relative path with the common prefix to
# define run_dir, and use run_dir to define the
# absolute directory. The initial directory is changed to the
# common prefix.
self.init_dir = basename
new_run_dir = os.path.relpath(run_dir, start=basename)
self.run_dir = os.path.join(self.init_dir, new_run_dir)
# print("run_dir switched from {} to {}"
# .format(run_dir, new_run_dir))
def _set_data_directory(self):
r"""
Set the attributes regarding the directories used to store data.
"""
# Set the data directory
data_dir = "data" # base name for the BigDFT data directory
if self.name != "":
data_dir += "-" + self.name
self.data_dir = os.path.join(self.run_dir, data_dir)
def _set_cmd_attributes(self):
r"""
Set the base commands to run bigdft or bigdft-tool.
"""
# The base bigdft-tool command is always the same
self.bigdft_tool_cmd = [BIGDFT_TOOL_PATH]
if self.name:
self.bigdft_tool_cmd += ["--name", self.name]
# The base bigdft command depends on name and on skip
skip_option = []
if self.skip:
skip_option += ["-s", "Yes"]
if self.name != "":
self.bigdft_cmd = [BIGDFT_PATH, self.name] + skip_option
else:
self.bigdft_cmd = [BIGDFT_PATH] + skip_option
def _set_filename_attributes(self):
r"""
Set the attributes regarding the name of the input and output
files.
"""
if self.name != "":
self.input_name = self.name + ".yaml" # input file name
self.posinp_name = self.name + ".xyz" # posinp file name
self.logfile_name = "log-" + self.input_name # output file name
else:
self.input_name = "input.yaml" # input file name
self.posinp_name = "posinp.xyz" # posinp file name
self.logfile_name = "log.yaml" # output file name
[docs] def __enter__(self):
r"""
When entering the context manager:
* create the directory where the calculations must be run,
* go to that directory.
"""
if self.run_dir not in [".", ""]:
if not os.path.exists(self.run_dir):
os.makedirs(self.run_dir)
os.chdir(self.run_dir)
print(os.getcwd())
return self
[docs] def __exit__(self, *args):
r"""
When leaving the context manager, go back to the initial
directory.
"""
os.chdir(self.init_dir)
[docs] def run(
self,
nmpi=1,
nomp=1,
force_run=False,
dry_run=False,
restart_if_incomplete=False,
timeout=None,
):
r"""
Run the BigDFT calculation if it was not already performed.
The number of MPI and OpenMP tasks may be specified.
You may force the calculation to run even though it was
previously successful (*e.g.*, a logfile already exists) by
setting `force_run` to `True`.
If `dry_run` is set to `True`, then bigdft-tool is run instead
of the BigDFT executable.
If `restart_if_incomplete` is set to `True`, the previously
existing logfile is removed and the calculation restarts.
Parameters
----------
nmpi : int
Number of MPI tasks.
nomp : int
Number of OpenMP tasks.
force_run : bool
If `True`, the calculation is run even though a logfile
already exists.
dry_run : bool
If `True`, the input files are written on disk, but the
bigdft-tool command is run instead of the bigdft one.
restart_if_incomplete : bool
If `True`, the job is restarted if the existing logfile is
incomplete.
timeout : float or int or None
Number of minutes after which the job must be stopped.
"""
# Copy the data directory of a reference calculation
if self.ref_data_dir is not None:
# Copy the data directory only when bigdft has to run
if force_run or not os.path.exists(self.logfile_name):
self._copy_reference_data_dir()
# Always update the input file, so that it reads the
# reference wavefunctions in the data directory
if os.path.exists(self.data_dir):
self._read_wavefunctions_from_data_dir()
if dry_run or force_run or not os.path.exists(self.logfile_name):
# Run bigdft (if dry_run is False) or bigdft-tool (if
# dry_run is True)
self._set_environment(nomp)
self.write_input_files()
command = self._get_command(nmpi, dry_run)
output_msg = self._launch_calculation(command, timeout)
if dry_run:
self._write_bigdft_tool_output(output_msg)
else:
output_msg = output_msg.decode("unicode_escape")
print(output_msg)
try:
self.logfile = Logfile.from_file(self.logfile_name)
except ValueError as e:
if str(e) == "The logfile is incomplete!":
raise RuntimeError("Timeout exceded ({} minutes)".format(timeout))
if os.path.exists(self.data_dir):
self._clean_data_dir()
else:
# The logfile already exists: the initial positions and the
# initial parameters used to perform that calculation must
# correspond to the ones used to initialize the current job.
print("Logfile {} already exists!\n".format(self.logfile_name))
try:
self.logfile = Logfile.from_file(self.logfile_name)
except ValueError as e:
incomplete_log = str(e) == "The logfile is incomplete!"
if incomplete_log and restart_if_incomplete:
# Remove the logfile and restart the calculation
print("The logfile was incomplete, restart calculation")
os.remove(self.logfile_name)
self.run(
nmpi=nmpi,
nomp=nomp,
force_run=force_run,
dry_run=dry_run,
restart_if_incomplete=False,
timeout=timeout,
)
else:
raise e
else:
self._check_logfile_posinp()
self._check_logfile_inputparams()
self.is_completed = True
def _copy_reference_data_dir(self):
r"""
Copy the reference data directory to the current calculation
directory so as to restart the new calculation from the result
of the reference calculation.
"""
if os.path.exists(self.ref_data_dir):
if os.path.basename(self.data_dir) in os.listdir(os.curdir):
# Remove the previously existing data directory before
# copying the reference data directory (otherwise,
# shutil.copytree raises an error).
shutil.rmtree(self.data_dir)
shutil.copytree(self.ref_data_dir, self.data_dir)
print("Data directory copied from {}.".format(self.ref_data_dir))
else:
print("Data directory {} not found.".format(self.ref_data_dir))
def _read_wavefunctions_from_data_dir(self):
r"""
Set the input parameters to read the wavefunctions from the data
directory if they exist.
"""
# Check that there are wavefunction files
wf_files = [f for f in os.listdir(self.data_dir) if "wavefunction" in f]
if wf_files:
# If there are wavefunction files, add the
# option to read them from files.
try:
self.inputparams["dft"]["inputpsiid"] = 2
except KeyError:
self.inputparams["dft"] = {"inputpsiid": 2}
else:
# Else, delete the option from the input file, if
# it is equal to 2 (might be better than completely
# removing inputpsiid ?).
try:
if self.inputparams["dft"]["inputpsiid"] == 2:
del self.inputparams["dft"]["inputpsiid"]
except KeyError:
pass
@staticmethod
def _set_environment(nomp):
r"""
Set the number of OpenMP threads.
Parameters
----------
nomp : int
Number of OpenMP tasks.
"""
nomp = int(nomp) # Make sure you get an integer
if nomp > 1:
os.environ["OMP_NUM_THREADS"] = str(nomp)
def _get_command(self, nmpi, dry_run):
r"""
Returns
-------
command : list
The command to run bigdft if `dry_run` is set to `False`,
else the command to run bigdft-tool.
Parameters
----------
nmpi : int
Number of MPI tasks.
dry_run : bool
If `True`, the input files are written on disk, but the
bigdft-tool command is run instead of the bigdft one.
"""
nmpi = int(nmpi) # Make sure you get an integer
mpi_option = []
if dry_run:
if nmpi > 1:
mpi_option = ["-n", str(nmpi)]
command = self.bigdft_tool_cmd + mpi_option
else:
if nmpi > 1:
mpi_option = ["mpirun", "-np", str(nmpi)]
command = mpi_option + self.bigdft_cmd
return command
@staticmethod
def _launch_calculation(command, timeout):
r"""
Launch the command to run the bigdft or bigdft-tool command.
Parameters
----------
command : list
The command to run bigdft or bigdft-tool.
Raises
------
RuntimeError
If the calculation ended with an error message.
"""
# Print the command in a human readable way
to_str = "{} " * len(command)
command_msg = to_str.format(*command) + "..."
print(command_msg)
# Run the calculation for at most timeout minutes
run = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if timeout is None:
# 60 years timeout should be enough...
timeout = 60 * 365 * 24 * 60
timer = Timer(timeout * 60, run.kill)
try:
timer.start()
out, err = run.communicate()
error_msg = err.decode("unicode_escape")
finally:
timer.cancel()
# Raise an error if the calculation ended badly, else return the
# decoded output message
if error_msg != "":
raise RuntimeError(
"The calculation ended with the following error message:{}".format(
error_msg
)
)
return out
def _write_bigdft_tool_output(self, output_msg):
r"""
Write the output of the bigdft-tool command on disk.
Parameters
----------
output_msg : str
Output of the bigdft-tool command as a Logfile.
"""
log = Logfile.from_stream(output_msg)
log.write(self.logfile_name)
def _clean_data_dir(self):
r"""
Clean the data directory, namely delete the wavefunctions in
the data folder if it was not requested to output them, and
delete the output files of a geopt calculation if a geopt
was not performed.
"""
# Delete the wavefunction files in the data directory and
# replace them by empty files if needed.
inp = self.inputparams
default = DEFAULT_PARAMETERS["output"]["orbitals"]
write_orbitals = (
"output" in inp
and "orbitals" in inp["output"]
and inp["output"]["orbitals"] != default
)
if "output" not in inp or not write_orbitals:
wf_files = [
os.path.join(self.data_dir, filename)
for filename in os.listdir(self.data_dir)
if filename.startswith("wavefunction")
]
for wf_file in wf_files:
os.remove(wf_file)
# Equivalent to touch wf_file in bash
with open(wf_file, "a"):
os.utime(wf_file, None)
# Delete geopt data if no geopt was required
if "geopt" not in inp:
# Delete the posout files
posout_files = [
os.path.join(self.data_dir, filename)
for filename in os.listdir(self.data_dir)
if filename.startswith("posout")
]
for posout_file in posout_files:
os.remove(posout_file)
# Delete the geopt.mon file
try:
os.remove(os.path.join(self.data_dir, "geopt.mon"))
except OSError:
pass
def _check_logfile_posinp(self):
r"""
Check that the posinp used in the logfile corresponds to the one
used to initialize the job.
Raises
------
UserWarning
If the initial geometry of the job does not correspond to
the one of the Logfile previously read from the disk.
"""
if isinstance(self.logfile, GeoptLogfile):
log_pos = self.logfile.posinps[0]
else:
log_pos = self.logfile.posinp
if log_pos != self.posinp:
raise UserWarning(
"The initial geometry of this job do not correspond to the "
"one used in the Logfile:\n"
"Logfile posinp:\n{}Actual posinp:\n{}".format(log_pos, self.posinp)
)
def _check_logfile_inputparams(self):
r"""
Check that the input parameters used in the logfile correspond
to the ones used to initialize the job.
Raises
------
UserWarning
If the input parameters of the job does not correspond to
the one used in the Logfile previously read from the disk.
"""
log_inp = self.logfile.inputparams
base_inp = self.inputparams
# Clean the disablesym key:
disablesym_in_log_inp = "dft" in log_inp and "disablesym" in log_inp["dft"]
disablesym_not_in_log_inp = (
"dft" in log_inp and "disablesym" not in log_inp["dft"]
)
disablesym_in_base_inp = "dft" in base_inp and "disablesym" in base_inp["dft"]
disablesym_not_in_base_inp = (
"dft" in base_inp and "disablesym" not in base_inp["dft"]
)
# - if present only in the log_inp
if disablesym_in_log_inp and disablesym_not_in_base_inp:
del log_inp["dft"]["disablesym"]
log_inp._params = clean(log_inp.params)
# - if present only in the base_inp
if disablesym_not_in_log_inp and disablesym_in_base_inp:
del base_inp["dft"]["disablesym"]
base_inp._params = clean(log_inp.params)
if base_inp != log_inp:
raise UserWarning(
"The input parameters of this job do not correspond to the "
"ones used in the Logfile:\n"
"Logfile input parameters:\n{}\nActual input parameters:\n{}".format(
log_inp, base_inp
)
)
[docs] def clean(self, data_dir=False, logfiles_dir=False):
r"""
Delete all input and output files on disk as well as some
directories if required.
Parameters
----------
data_dir : bool
If `True`, removes the data directory that might exist.
logfiles : bool
If `True`, removes the logfiles directory that might exist.
.. Warning::
The directories are forced to be removed when the above-
mentioned options are set to `True`: use with caution.
"""
# Delete the input and output files
filenames = [
self.logfile_name,
self.input_name,
self.posinp_name,
"forces_" + self.posinp_name,
"forces.xyz",
"time.yaml",
"input_minimal.yaml",
self.name + "_minimal.yaml",
]
for filename in filenames:
try:
os.remove(filename)
except OSError:
pass
# Delete the required directories
directories = []
if data_dir:
directories += ["data", "data-" + self.name]
if logfiles_dir:
directories += ["logfiles"]
for directory in directories:
shutil.rmtree(directory, ignore_errors=True)