Source code for mybigdft.workflows.convergences

r"""
Here are defined the various classes allowing to perform the convergence
of a BigDFT calculation with respect to some input parameters,
including:

* AbstractConvergence, which is the base class for all the other classes
  here,
* HgridsConvergence for the grid steps of the wavelet grid in each
  direction,
* RmultConvergence for the grid extension of the wavelet grid(s).
"""

import os
import sys
import warnings
import abc
from copy import deepcopy
import numpy as np
from mybigdft import Job
from mybigdft.globals import EV_TO_HA
from mybigdft.workflows.workflow import AbstractWorkflow

if sys.version_info >= (3, 4):  # pragma: no cover
    ABC = abc.ABC
else:  # pragma: no cover
    ABC = abc.ABCMeta(str("ABC"), (), {})


__all__ = ["HgridsConvergence", "RmultConvergence"]


class AbstractConvergence(AbstractWorkflow, ABC):
    r"""
    This is the base class allowing to run a convergence procedure.
    Starting from a reference value for the considered input parameter,
    a queue of jobs is defined, each job with a value of that input
    parameter that should lead to results of decreasing quality.

    When the worklfow is run, the jobs are run by decreasing quality.
    After each job is run, its energy is compared to the reference
    energy and must lie in a given precision window, defined by an
    energy per atom to be defined as converged. Once a job is not in the
    acceptable range, this means that lower quality jobs won't be
    neither, so the workflow is stopped, even if more jobs are in the
    queue.
    """

    POST_PROCESSING_ATTRIBUTES = ["converged"]

    def __init__(
        self, base_job, reference, delta, n_jobs=10, precision_per_atom=0.01 * EV_TO_HA
    ):
        r"""
        One must provide a base `Job` instance defining the system
        considered and possibly non-default input parameters. Then, one
        must give the reference value of the input
        parameter for which the convergence is considered. A delta value
        of this input parameter is then used to initialize a given
        number of jobs (`n_jobs`) of decreasing quality. Finally, a
        precision per atom (in Ha) is required (default to 0.3675 mHa
        (= 10 meV) par atom).

        Parameters
        ----------
        base_job : Job
            Template for all the other jobs of this workflow.
        reference
            Reference input parameter, giving high quality results.
        delta
            Variation of of the input parameter between two runs.
        n_jobs : int
            Maximal number of jobs to be run.
        precision_per_atom : float
            Precision per atom for a job to be considered as converged,
            the reference energy being that of the job using the
            reference input parameter (units: Ha).
        """
        reference, delta = self._clean_initial_parameters(reference, delta)
        # Set all the important attributes
        self._base_job = base_job
        self._precision_per_atom = precision_per_atom
        queue = self._initialize_queue(reference, delta, n_jobs)
        super(AbstractConvergence, self).__init__(queue=queue)

    @property
    def base_job(self):
        r"""
        Returns
        -------
        Job
            Template for all the other jobs of this workflow.
        """
        return self._base_job

    @property
    def precision_per_atom(self):
        r"""
        Returns
        -------
        float
            Precision per atom for a job to be considered as converged.
        """
        return self._precision_per_atom

    @property
    def converged(self):
        r"""
        Returns
        -------
        Job
            Job with the highest hgrids and an energy that can be
            considered as converged (given a precision per atom).
        """
        return self._converged

    @abc.abstractmethod
    def summary(self, first_column_label, first_column_values):
        r"""
        Print a summary of the convergence workflow.

        This method must be called at the end of each child method
        :meth:`summary`.

        Parameters
        ----------
        first_column_label: str
            Label of the first column.
        first_column_values: list of str
            String representation of the value of the varied parameter
            for each job that ran.
        """
        shift = 2
        unformatted = ["{:.2e}", "{}"]
        other_labels = ["precision_per_atom (Ha)", "is_converged"]
        other_lengths = [len(label) + shift for label in other_labels]
        # Print the desired precision per atom
        print(
            "Requested precision per atom: {:.2e} (Ha)".format(self.precision_per_atom)
        )
        if first_column_values != []:
            # Print the header of the table
            first_column_length = len(first_column_values[0]) + shift
            first_column = first_column_label.center(first_column_length)
            other_columns = ""
            for label in other_labels:
                other_columns += label.center(len(label) + shift)
            header = first_column + other_columns
            print("-" * len(header))
            print(header)
            print("-" * len(header))
            # Print each line of the table
            for i, value in enumerate(first_column_values):
                first_column = value.center(first_column_length)
                job = self.queue[i]
                values = [job.precision_per_atom, job.is_converged]
                other_columns = ""
                for i, value in enumerate(values):
                    other_columns += (
                        unformatted[i].format(value).center(other_lengths[i])
                    )
                print(first_column + other_columns)

    @staticmethod
    @abc.abstractmethod
    def _clean_initial_parameters(reference, delta):
        r"""
        Clean the value of the initial parameters.

        Parameters
        ----------
        reference
            Reference input parameter, giving high quality results.
        delta
            Variation of of the input parameter between two runs.

        Returns
        -------
        tuple of length 2
            Reference value and delta value of the varied parameter.
        """
        raise NotImplementedError

    def _initialize_queue(self, reference, delta, n_jobs):
        r"""
        Initialize the jobs to be run in order to perform the hgrids
        convergence.

        Parameters
        ----------
        reference
            Reference input parameter, giving high quality results.
        delta
            Variation of of the input parameter between two runs.
        n_jobs : int
            Maximal number of jobs to be run.

        Returns
        -------
        list
            Queue of jobs to be run.
        """
        # Define the parameters to be used during this workflow
        param_variations = self._initialize_param_variations(reference, delta, n_jobs)
        # Set the queue of jobs according to the hgrids defined
        pos = self.base_job.posinp
        name = self.base_job.name
        queue = []
        for param in param_variations:
            # The input parameters and the run directory of the base job
            # are updated given the value of the parameter
            new_inp = self._new_inputparams(param)
            new_run_dir = self._new_run_dir(param)
            job = Job(posinp=pos, inputparams=new_inp, name=name, run_dir=new_run_dir)
            job.param = param
            queue.append(job)
        return queue

    @staticmethod
    @abc.abstractmethod
    def _initialize_param_variations(reference, delta, n_jobs):
        r"""
        Initialize the value of the input parameter that varies
        for each job to be created.

        Parameters
        ----------
        reference
            Reference input parameter, giving high quality results.
        delta
            Variation of of the input parameter between two runs.
        n_jobs : int
            Maximal number of jobs to be run.

        Returns
        -------
        list
            Values of the input parameter for all the jobs.
        """
        raise NotImplementedError

    @abc.abstractmethod
    def _new_inputparams(self, param):
        r"""
        Define the input parameters of the new job, according to the
        value of the varied parameter.

        Parameters
        ----------
        param
            Value of the varied parameter

        Returns
        -------
        InputParams
            Value of the new input parameters.
        """
        raise NotImplementedError

    @abc.abstractmethod
    def _new_run_dir(self, param):
        r"""
        Define the directory where the new job must run according
        to the value of the varied parameter.

        Parameters
        ----------
        param
            Value of the varied parameter

        Returns
        -------
        str
            Value of the new run_dir
        """
        raise NotImplementedError

    def _run(self, nmpi, nomp, force_run, dry_run, restart_if_incomplete, timeout):
        r"""
        This method runs the jobs until the hgrids are too high to
        stop giving results in the desired precision range.

        Parameters
        ----------
        nmpi : int
            Number of MPI tasks.
        nomp : int
            Number of OpenMP tasks.
        force_run : bool
            If `True`, the calculations are run even though a logfile
            already exists.
        dry_run : bool
            If `True`, the input files are written on disk, but the
            bigdft-tool command is run instead of the bigdft one.
        restart_if_incomplete : bool
            If `True`, the job is restarted if the existing logfile is
            incomplete.
        timeout : float or int or None
            Number of minutes after which each job must be stopped.

        Warns
        ------
        UserWarning
            If the job with minimal energy does not correspond to the
            job with minimal hgrids.
        """
        # Run the first job of the queue and get the reference energy
        with self.queue[0] as ref_job:
            ref_job.run(
                nmpi=nmpi,
                nomp=nomp,
                force_run=force_run,
                dry_run=dry_run,
                timeout=timeout,
                restart_if_incomplete=restart_if_incomplete,
            )
        ref_job.is_converged = True
        ref_job.precision_per_atom = 0.0
        self._converged = True
        min_en = ref_job.logfile.energy
        n_at = len(ref_job.posinp)
        # Run the jobs until the energy of a given run is above the
        # requested precision
        for i, job in enumerate(self.queue[1:]):
            if not self.queue[i].is_converged:
                # If the previous job is not converged, then neither is
                # this one.
                job.is_converged = False
            else:
                with job as j:
                    j.run(
                        nmpi=nmpi,
                        nomp=nomp,
                        force_run=force_run,
                        dry_run=dry_run,
                        timeout=timeout,
                        restart_if_incomplete=restart_if_incomplete,
                    )
                # Warn a UserWarning if the current job gives a lower
                # energy than the reference one
                en = job.logfile.energy
                if en <= min_en:
                    warnings.warn(self._too_low_energy_msg, UserWarning)
                # Assess if the job is converged or not
                job.precision_per_atom = (en - min_en) / n_at
                job.is_converged = job.precision_per_atom <= self.precision_per_atom
                if job.is_converged:
                    self._converged = job
        if not dry_run:
            self.post_proc()
            assert self.is_completed, (
                "You must define all post-processing " "attributes in post_proc."
            )

    @property
    @abc.abstractmethod
    def _too_low_energy_msg(self):
        r"""
        Define the message telling the user what steps he mighy consider
        to remove the UserWarning in :method:`_run`.

        Raises
        ------
        NotImplementedError
        """
        raise NotImplementedError

    def post_proc(self):
        r"""
        Warns
        -----
        UserWarning
            If only the reference job can be considered as converged or
            if all the jobs are below the requested precision.
        """
        # Warn the user if he is too conservative
        if self.queue[-1].is_converged:
            warnings.warn(self._all_converged_msg, UserWarning)
        # Warn the user if he must consider using parameters giving
        # better results
        if not self.queue[1].is_converged:
            warnings.warn(self._only_reference_converged_msg, UserWarning)

    @property
    @abc.abstractmethod
    def _all_converged_msg(self):
        r"""
        Define the message warning the user when he is being too
        conservative with the current set of varied input parameters.

        Raises
        ------
        NotImplementedError
        """
        raise NotImplementedError

    @property
    @abc.abstractmethod
    def _only_reference_converged_msg(self):
        r"""
        Define the message warning the user that he should probably
        increase the quality of the calculations.

        Raises
        ------
        NotImplementedError
        """
        raise NotImplementedError


[docs]class HgridsConvergence(AbstractConvergence): r""" BigDFT uses wavelets on a grid to represent the wavefunctions and density of the system. One key parameter of a BigDFT calculation therefore is the grid spacing (noted hgrids) between two grid points (which must be defined in the three directions of space). This class allows to run all the necessary calculations to determine the largest hgrids which must be used so that the energy error compared to the reference calculation (with the lowest hgrid considered) lie within the required precision per atom. """
[docs] def summary(self): r""" Print a summary of the results of the hgrids convergence workflow. """ first_column_values = [ "[{:.2f}, {:.2f}, {:.2f}]".format(*job.param) for job in self.queue if job.is_completed ] super(HgridsConvergence, self).summary("hgrids", first_column_values)
@staticmethod def _clean_initial_parameters(reference, delta): r""" Clean the value of the initial parameters. Returns ------- tuple made of two numpy arrays of length 3 Reference value and delta value of hgrids. """ # Make sure all the received hgrids are lists of length 3 if not isinstance(delta, list): delta = [float(delta)] * 3 if not isinstance(reference, list): reference = [float(reference)] * 3 for hgrids in [reference, delta]: assert len(hgrids) == 3, "{} not of length 3".format(hgrids) # Make sure the signs in delta are positive delta = np.array([abs(value) for value in delta], dtype=float) return np.array(reference, dtype=float), delta @staticmethod def _initialize_param_variations(reference, delta, n_jobs): r""" Initialize the value of hgrids for each job to be created. Parameters ---------- reference Reference input parameter, giving high quality results. delta Variation of of the input parameter between two runs. n_jobs : int Maximal number of jobs to be run. Returns ------- list Values of hgrids for all the jobs. """ return [ (reference + i * delta).round(decimals=2).tolist() for i in range(n_jobs) ] def _new_inputparams(self, param): r""" Define the input parameters of the new job, according to the value of hgrids. Parameters ---------- param Value of hgrids Returns ------- InputParams Value of the new input parameters. """ new_inp = deepcopy(self.base_job.inputparams) if "dft" not in new_inp: new_inp["dft"] = {"hgrids": param} else: new_inp["dft"]["hgrids"] = param return new_inp def _new_run_dir(self, param): r""" Define the directory where the new job must run according to the value of hgrids. Parameters ---------- param Value of hgrids Returns ------- str Value of the new run_dir """ base_run_dir = self.base_job.run_dir return os.path.join(base_run_dir, "{:.2f}_{:.2f}_{:.2f}".format(*param)) @property def _too_low_energy_msg(self): # pragma: no cover r""" Returns ------- str Message telling the user what steps he mighy consider to remove the UserWarning he is going to receive because a lower quality calculation gave a lower energy than the reference calculation. """ return ( "The job with minimal energy does not correspond to the job " "with minimal hgrids. Consider increasing rmult or the " "reference hgrids." ) @property def _all_converged_msg(self): # pragma: no cover r""" Returns ------- str Message warning the user he is being too conservative with this set of varied input parameters. """ return ( "You may want to test higher hgrids as there is still room " "to increase hgrids while staying below the requested " "precision per atom." ) @property def _only_reference_converged_msg(self): # pragma: no cover r""" Returns ------- str Message warning the user he should probably increase the quality of the calculations. """ return ( "You may want to consider smaller values of hgrids to make " "sure convergence was achieved." )
[docs]class RmultConvergence(AbstractConvergence): r""" BigDFT uses wavelets on a grid to represent the wavefunctions and density of the system. One key parameter of a BigDFT calculation therefore is the grid extension (noted rmult). There are actually two grids centered on the atoms of the system: a coarse grid, with a shorter extension and one fine grid, with a longer extension. This class allows to run all the necessary calculations to determine the smallest rmult which must be used so that the energy error compared to the reference calculation (with the largest grid extension considered) lie within the required precision per atom. """
[docs] def summary(self): r""" Print a summary of the results of the rmult convergence workflow. """ first_column_values = [ "[{:.1f}, {:.1f}]".format(*job.param) for job in self.queue if job.is_completed ] super(RmultConvergence, self).summary("rmult", first_column_values)
@staticmethod def _clean_initial_parameters(reference, delta): r""" Clean the value of the initial parameters. Returns ------- tuple made of two numpy arrays of length 2 Reference value and delta value of rmult. """ # Make sure the rmults are lists of length 2 for rmult in [reference, delta]: assert len(rmult) == 2, "{} not of length 2".format(rmult) # Make sure the signs in delta are positive delta = np.array([-abs(value) for value in delta]) return np.array(reference, dtype=float), delta @staticmethod def _initialize_param_variations(reference, delta, n_jobs): r""" Initialize the value of rmult for each job to be created. Parameters ---------- reference Reference input parameter, giving high quality results. delta Variation of of the input parameter between two runs. n_jobs : int Maximal number of jobs to be run. Returns ------- list Values of rmults for all the jobs. """ return [ (reference + i * delta).round(decimals=1).tolist() for i in range(n_jobs) ] def _new_inputparams(self, param): r""" Define the input parameters of the new job, according to the value of rmult. Parameters ---------- param Value of rmult Returns ------- InputParams Value of the new input parameters. """ new_inp = deepcopy(self.base_job.inputparams) if "dft" not in new_inp: new_inp["dft"] = {"rmult": param} else: new_inp["dft"]["rmult"] = param return new_inp def _new_run_dir(self, param): r""" Define the directory where the new job must run according to the value of rmult. Parameters ---------- param Value of rmult Returns ------- str Value of the new run_dir """ base_run_dir = self.base_job.run_dir return os.path.join(base_run_dir, "{:.1f}_{:.1f}".format(*param)) @property def _too_low_energy_msg(self): r""" Returns ------- str Message telling the user what steps he mighy consider to remove the UserWarning he is going to receive because a lower quality calculation gave a lower energy than the reference calculation. """ return ( "The job with minimal energy does not correspond to the job " "with maximal rmult. Consider decreasing hgrids or the " "reference rmult." ) @property def _all_converged_msg(self): r""" Returns ------- str Message warning the user he is being too conservative with this set of varied input parameters. """ return ( "You may want to test lower rmult as there is still room to " "decrease rmult while staying below the requested precision " "per atom." ) @property def _only_reference_converged_msg(self): r""" Returns ------- str Message warning the user he should probably increase the quality of the calculations. """ return ( "You may want to consider larger values of rmult to make sure " "convergence was achieved." )