r"""
The workflows of the MyBigDFT package are meant to ease the calculation
of some interesting quantities requiring to launch multiple BigDFT
calculations by providing an API that focuses on the main parameters of
such nested calculations.
Here are defined an :class:`AbstractWorkflow` class (meant to be the
base class of all the other workflow classes implemented in the
workflows module) and a :class:`Workflow` class, which represents the
simplest way of implementing such a child class (intended to be used
when one wants to create a toy implementation of a new workflow).
"""
from __future__ import print_function, unicode_literals
import sys
import warnings
import abc
if sys.version_info >= (3, 4): # pragma: no cover
ABC = abc.ABC
else: # pragma: no cover
ABC = abc.ABCMeta(str("ABC"), (), {})
[docs]class AbstractWorkflow(ABC):
r"""
This abstract class is the base class of all the workflows of this
module. It defines the queue of jobs as a list of
:class:`~mybigdft.job.Job` instances, that are run sequentially when
the :meth:`run` method is used.
"""
POST_PROCESSING_ATTRIBUTES = []
def __init__(self, queue):
r"""
Parameters
----------
queue : list
List of all the jobs to run.
"""
self._initialize_post_processing_attributes()
self._queue = queue
def _initialize_post_processing_attributes(self):
r"""
Set all the completion attributes to their default value.
"""
for attr in self.POST_PROCESSING_ATTRIBUTES:
setattr(self, "_" + attr, None)
@property
def queue(self):
r"""
Returns
-------
list
All the jobs of the workflow.
"""
return self._queue
@property
def logfiles(self):
r"""
Returns
-------
dict
A dictionary of all the logfiles of the workflow, with the
name of the associated job as key.
"""
return {job.name: job.logfile for job in self.queue}
[docs] def run(
self,
nmpi=1,
nomp=1,
force_run=False,
dry_run=False,
restart_if_incomplete=False,
timeout=None,
):
r"""
Run all the calculations if the post-processing was not already
performed.
.. Warning::
If `force_run` or `dry_run` is set to `True`, then any
previous value of the post-processing attributes is deleted
and set back to their default value, so that the
post-processing is not considered as being performed.
Parameters
----------
nmpi : int
Number of MPI tasks.
nomp : int
Number of OpenMP tasks.
force_run : bool
If `True`, the calculations are run even though a logfile
already exists.
dry_run : bool
If `True`, the input files are written on disk, but the
bigdft-tool command is run instead of the bigdft one.
restart_if_incomplete : bool
If `True`, the job is restarted if the existing logfile is
incomplete.
timeout : float or int or None
Number of minutes after which each job must be stopped.
Warns
-----
UserWarning
If the post-processing was already completed.
"""
if force_run or dry_run:
self._initialize_post_processing_attributes()
if not self.is_completed:
self._run(nmpi, nomp, force_run, dry_run, restart_if_incomplete, timeout)
else:
warning_msg = (
"Calculations already performed; set the argument "
"'force_run' to True to re-run them."
)
warnings.warn(warning_msg, UserWarning)
if any([not job.is_completed for job in self.queue]):
warnings.warn("Some jobs of the workflow were not run.", UserWarning)
@property
def is_completed(self):
r"""
Returns
-------
bool
`True` if all the post-processing attributes are no longer
set to their default value.
"""
return all(
[
getattr(self, attr) is not None
for attr in self.POST_PROCESSING_ATTRIBUTES
]
)
def _run(self, nmpi, nomp, force_run, dry_run, restart_if_incomplete, timeout):
r"""
This method runs all the jobs in the queue sequentially before
running the post_proc method if not in `dry_run` mode.
Parameters
----------
nmpi : int
Number of MPI tasks.
nomp : int
Number of OpenMP tasks.
force_run : bool
If `True`, the calculations are run even though a logfile
already exists.
dry_run : bool
If `True`, the input files are written on disk, but the
bigdft-tool command is run instead of the bigdft one.
restart_if_incomplete : bool
If `True`, the job is restarted if the existing logfile is
incomplete.
timeout : float or int or None
Number of minutes after which each job must be stopped.
"""
for job in self.queue:
with job as j:
j.run(
nmpi=nmpi,
nomp=nomp,
force_run=force_run,
dry_run=dry_run,
timeout=timeout,
restart_if_incomplete=restart_if_incomplete,
)
if not dry_run:
self.post_proc()
assert self.is_completed, (
"You must define all post-processing " "attributes in post_proc."
)
[docs] @abc.abstractmethod
def post_proc(self):
r"""
This should be an abstract method used to post-process the
output of the calculations and get some meaningful results out of
them.
"""
raise NotImplementedError
[docs]class Workflow(AbstractWorkflow):
r"""
This is a usable workflow that one can play with, but without
post-processing. This means you can add jobs to the queue and run
them as usual, but you must then code the post-processing yourself,
in a separate function taking the workflow as parameter.
This latter scheme can even be part of the development cycle of a
new workflow that could be later added to the MyBigDFT project: you
first define roughly the jobs to be added to the queue for your
workflow, and then develop a post-processing function taking your
workflow as argument. When you are happy with the result, it is
then easy to re-use most of the code to create a workflow class
deriving from the :class:`AbstractWorkflow` class.
To do that, you only have to make sure that you override the
`__init__` method to create the queue of jobs, then define which are
the post-processing arguments and create properties to access them,
and finally override the `post_proc` method to make sure these
attributes are properly initialized.
"""
POST_PROCESSING_ATTRIBUTES = ["completed"]
def __init__(self, queue=None):
r"""
Parameters
----------
queue : list
List of all the jobs to run.
The queue can be empty:
>>> wf = Workflow()
>>> wf.queue
[]
>>> wf.logfiles
{}
"""
if queue is None:
queue = []
super(Workflow, self).__init__(queue)
@property
def completed(self):
r"""
Returns
-------
bool
`True` if the post_proc method was run successfully.
"""
return self._completed
[docs] def post_proc(self):
r"""
Set the post-processing attribute ``completed`` to `True`
"""
self._completed = True