Source code for tcutility.job.adf

import os

from scm import plams

from tcutility import data, formula, log, molecule, results, spell_check
from tcutility.errors import TCCompDetailsError, TCJobError
from tcutility.job.ams import AMSJob
from tcutility.job.generic import Job

j = os.path.join


[docs] class ADFJob(AMSJob): def __init__(self, *args, **kwargs): self.settings = results.Result() self._functional = None self._core = None self.solvent("vacuum") self.basis_set("TZ2P") self.quality("Good") self.SCF(thresh=1e-8) self.single_point() # by default print the fock matrix self.settings.input.adf.print = "SFOSiteEnergies" super().__init__(*args, **kwargs) def __str__(self): return f"{self._task}({self._functional}/{self._basis_set}), running in {os.path.join(os.path.abspath(self.rundir), self.name)}"
[docs] def basis_set(self, typ: str = "TZ2P", core: str = "None"): """ Set the basis-set type and frozen core approximation for this calculation. Args: typ: the type of basis-set to use. Default is TZ2P. core: the size of the frozen core approximation. Default is None. Raises: ValueError: if the basis-set name or core is incorrect. .. note:: If the selected functional is the r2SCAN-3c functional, then the basis-set will be set to mTZ2P. .. seealso:: :mod:`tcutility.data.basis_sets` for an overview of the available basis-sets in ADF. """ spell_check.check(typ, data.basis_sets.available_basis_sets["ADF"], ignore_case=True) spell_check.check(core, ["None", "Small", "Large"], ignore_case=True) if self._functional == "r2SCAN-3c" and typ != "mTZ2P": log.warn(f"Basis set {typ} is not allowed with r2SCAN-3c, switching to mTZ2P.") typ = "mTZ2P" self._basis_set = typ self._core = core self.settings.input.adf.basis.type = typ self.settings.input.adf.basis.core = core
[docs] def spin_polarization(self, val: int): """ Set the spin-polarization of the system. If the value is not zero the calculation will also be unrestricted. """ self.settings.input.adf.SpinPolarization = val if val != 0: self.settings.input.adf.Unrestricted = "Yes"
[docs] def multiplicity(self, val: int): """ Set the multiplicity of the system. If the value is not one the calculation will also be unrestricted. We use the following values: 1) singlet 2) doublet 3) triplet 4) ... The multiplicity is equal to 2*S+1 for spin-polarization S. """ self.settings.input.adf.SpinPolarization = (val - 1) // 2 if val != 1: self.settings.input.adf.Unrestricted = "Yes"
[docs] def unrestricted(self, val: bool): """ Whether the calculation should be unrestricted. """ self.settings.input.adf.Unrestricted = "Yes" if val else "No"
[docs] def occupations(self, strategy: str): """ Set the orbital filling strategy for ADF. Args: strategy: the name of the filling strategy. This can contain multiple of the options allowed. .. seealso:: The SCM documentation can be found at https://www.scm.com/doc/ADF/Input/Electronic_Configuration.html#aufbau-smearing-freezing """ self.settings.input.adf.Occupations = strategy
[docs] def quality(self, val: str = "Good"): """ Set the numerical quality of the calculation. Args: val: the numerical quality value to set to. This is the same as the ones used in the ADF GUI. Defaults to Good. Raises: ValueError: if the quality value is incorrect. """ spell_check.check(val, ["Basic", "Normal", "Good", "VeryGood", "Excellent"], ignore_case=True) self.settings.input.adf.NumericalQuality = val
[docs] def SCF_convergence(self, thresh: float = 1e-8): """ Set the SCF convergence criteria for the job. Args: thresh: the convergence criteria for the SCF procedure. Defaults to 1e-8. .. deprecated:: 0.9.2 Please use :meth:`ADFJob.SCF` instead of this method. """ log.warn("This method has been deprecated, please use ADFJob.SCF instead.") self.SCF(thresh=thresh)
[docs] def SCF(self, iterations: int = 300, thresh: float = 1e-8): """ Set the SCF settings for this calculations. Args: iterations: number of iterations to perform for this calculation. Defaults to 300. thresh: the convergence criteria for the SCF procedure. Defaults to 1e-8. .. note:: When setting the number of iterations to 0 or 1 the ``AlwaysClaimSuccess`` key will also be set to ``Yes``. This is to prevent the job from being flagged as a failure when reading it using :mod:`tcutility.results`. """ self.settings.input.adf.SCF.Iterations = iterations self.settings.input.adf.SCF.Converge = thresh if iterations in [0, 1]: self.settings.input.ams.EngineDebugging.AlwaysClaimSuccess = "Yes"
[docs] def functional(self, funtional_name: str, dispersion: str = None): """ Set the functional to be used by the calculation. This also sets the dispersion if it is specified in the functional name. Args: funtional_name: the name of the functional. The value can be the same as the ones used in the ADF GUI. dispersion: dispersion setting to use with the functional. This is used when you want to use a functional from LibXC. Raises: ValueError: if the functional name is not recognized. .. note:: Setting the functional to r2SCAN-3c will automatically set the basis-set to mTZ2P. .. seealso:: :mod:`tcutility.data.functionals` for an overview of the available functionals in ADF. """ # before adding the new functional we should clear any previous functional settings self.settings.input.adf.pop("XC", None) functional = funtional_name.strip() functional = functional.replace("-D4(EEQ)", "-D4") # D4(EEQ) and D4 are the same, unlike D3 and D3(BJ) self._functional = functional if functional == "r2SCAN-3c" and self._basis_set != "mTZ2P": log.info(f"Switching basis set from {self._basis_set} to mTZ2P for r2SCAN-3c.") self.basis_set("mTZ2P") if functional == "SSB-D": raise TCCompDetailsError(section="Functional", message='There are two functionals called SSB-D, please use "GGA:SSB-D" or "MetaGGA:SSB-D".') if not data.functionals.get(functional): raise TCCompDetailsError(section="Functional", message=f"XC-functional {functional} not found.") else: func = data.functionals.get(functional) self.settings.input.adf.update(func.adf_settings)
[docs] def relativity(self, level: str = "Scalar"): """ Set the treatment of relativistic effects for this calculation. Args: level: the level to set. Can be the same as the values in the ADF GUI and documentation. By default it is set to Scalar. Raises: ValueError: if the relativistic correction level is not correct. """ spell_check.check(level, ["Scalar", "None", "Spin-Orbit"], ignore_case=True) self.settings.input.adf.relativity.level = level
[docs] def solvent(self, name: str = None, eps: float = None, rad: float = None, use_klamt: bool = False): """ Model solvation using COSMO for this calculation. Args: name: the name of the solvent you want to use. Please see the ADF manual for a list of available solvents. eps: the dielectric constant of your solvent. You can use this in place of the solvent name if you need more control over COSMO. rad: the radius of the solvent molecules. You can use this in place of the solvent name if you need more control over COSMO. use_klamt: whether to use the klamt atomic radii. This is usually used when you have charged species (?). Raises: ValueError: if the solvent name is given, but incorrect. .. seealso:: :mod:`tcutility.data.cosmo` for an overview of the available solvent names and formulas. """ if name: spell_check.check(name, data.cosmo.available_solvents, ignore_case=True, insertion_cost=0.3) self._solvent = name else: self._solvent = f"COSMO(eps={eps} rad={rad})" if name == "vacuum": self.settings.input.adf.pop("Solvation", None) return self.settings.input.adf.Solvation.Surf = "Delley" solv_string = "" if name: solv_string += f"name={name} " else: self.settings.input.adf.Solvation.Solv = f"eps={eps} rad={rad} " if use_klamt: solv_string += "cav0=0.0 cav1=0.0" else: solv_string += "cav0=0.0 cav1=0.0067639" self.settings.input.adf.Solvation.Solv = solv_string self.settings.input.adf.Solvation.Charged = "method=CONJ corr" self.settings.input.adf.Solvation["C-Mat"] = "POT" self.settings.input.adf.Solvation.SCF = "VAR ALL" self.settings.input.adf.Solvation.CSMRSP = None if use_klamt: radii = {"H": 1.30, "C": 2.00, "N": 1.83, "O": 1.72, "F": 1.72, "Si": 2.48, "P": 2.13, "S": 2.16, "Cl": 2.05, "Br": 2.16, "I": 2.32} self.settings.input.adf.solvation.radii = radii
[docs] def symmetry(self, group: str): """ Specify the symmetry group to be used by ADF. Args: group: the symmetry group to be used. """ self.settings.input.adf.Symmetry = group
def _check_job(self): # if we have spinpolarization we do not want to use DFTB to calculate the initial hessian if self.settings.input.adf.SpinPolarization != 0 and self.settings.input.ams.GeometryOptimization.InitialHessian.Type == "CalculateWithFastEngine": # we simply remove it from the geometryoptimization block self.settings.input.ams.GeometryOptimization.pop("InitialHessian", None)
[docs] class ADFFragmentJob(ADFJob): def __init__(self, *args, **kwargs): self.decompose_elstat = kwargs.pop('decompose_elstat', False) self.child_jobs = {} # self.child_jobs_no_electrons = {} super().__init__(*args, **kwargs) self.name = "EDA"
[docs] def add_fragment(self, mol: plams.Molecule, name: str = None, charge: int = 0, spin_polarization: int = 0): """ Add a fragment to this job. Optionally give the name, charge and spin-polarization of the fragment as well. Args: mol: the molecule corresponding to the fragment. name: the name of the fragment. By default it will be set to ``fragment{N+1}`` if ``N`` is the number of fragments already present. charge: the charge of the fragment to be added. spin_polarization: the spin-polarization of the fragment to be added. """ # in case of giving fragments as indices we dont want to add the fragment to the molecule later add_frag_to_mol = True # we can be given a list of atoms if isinstance(mol, list) and isinstance(mol[0], plams.Atom): mol_ = plams.Molecule() [mol_.add_atom(atom) for atom in mol] mol = mol_ # or a list of integers if isinstance(mol, list) and isinstance(mol[0], int): if not self._molecule: log.error(f"Trying to add fragment based on atom indices, but main job does not have a molecule yet. Call the {self.__class__.__name__}.molecule method to add one.") return mol_ = plams.Molecule() [mol_.add_atom(self._molecule[i]) for i in mol] mol = mol_.copy() add_frag_to_mol = False # check if the atoms in the new fragment are already present in the other fragments. # if it is we should raise an error for child in self.child_jobs.values(): if any((atom.symbol, atom.coords) == (myatom.symbol, myatom.coords) for atom in child._molecule for myatom in mol): raise TCJobError(job_class=self.__class__.__name__, message="The atoms in the new fragment are already present in the other fragments.") name = name or f"fragment{len(self.child_jobs) + 1}" self.child_jobs[name] = ADFJob(test_mode=self.test_mode) self.child_jobs[name].molecule(mol) self.child_jobs[name].charge(charge) self.child_jobs[name].spin_polarization(spin_polarization) setattr(self, name, self.child_jobs[name]) if not add_frag_to_mol: return if self._molecule is None: self._molecule = self.child_jobs[name]._molecule.copy() else: for atom in self.child_jobs[name]._molecule.copy(): if any((atom.symbol, atom.coords) == (myatom.symbol, myatom.coords) for myatom in self._molecule): continue self._molecule.add_atom(atom)
[docs] def guess_fragments(self): """ Guess what the fragments are based on data stored in the molecule provided for this job. This will automatically set the correct fragment molecules, names, charges and spin-polarizations. .. seealso:: | :func:`tcutility.molecule.guess_fragments` for an explanation of the xyz-file format required to guess the fragments. | :meth:`ADFFragmentJob.add_fragment` to manually add a fragment. .. note:: This function will be automatically called if there were no fragments given to this calculation. """ frags = molecule.guess_fragments(self._molecule) if frags is None: log.error("Could not load fragment data for the molecule.") return False for fragment_name, fragment in frags.items(): charge = fragment.flags.get("charge", 0) spin_polarization = fragment.flags.get("spinpol", 0) self.add_fragment(fragment, fragment_name, charge=charge, spin_polarization=spin_polarization) return True
[docs] def remove_virtuals(self, frag=None, subspecies=None, nremove=None): """ Remove virtual orbitals from the fragments. Args: frag: the fragment to remove virtuals from. If set to ``None`` we remove all virtual orbitals of all fragments. subspecies: the symmetry subspecies to remove virtuals from. If set to ``None`` we assume we have ``A`` subspecies. nremove: the number of virtuals to remove. If set to ``None`` we will guess the number of virtuals based on the basis-set chosen. """ if frag is None: self.settings.input.adf.RemoveAllFragVirtuals = "Yes" return # if nremove is not given we will get it from the atoms in the fragment if nremove is None: # guess the virtual numbers only works for non-frozen-core calculations if self._core.lower() != "none": raise TCJobError(job_class=self.__class__.__name__, message="Cannot guess number of virtual orbitals for calculations with a frozen core.") # the basis-set has to be present in the prepared data if self._basis_set.lower() not in [bs.lower() for bs in data.basis_sets._number_of_orbitals.keys()]: raise TCJobError(job_class=self.__class__.__name__, message=f"Cannot guess number of virtual orbitals for calculations with the {self._basis_set} basis-set.") # sum up the number of virtuals per atom in the fragment nremove = 0 for atom in self.child_jobs[frag]._molecule: nremove += data.basis_sets.number_of_virtuals(atom.symbol, self._basis_set) # positive charge adds a virtual and negative removes a virtual nremove += self.child_jobs[frag].settings.input.ams.System.charge or 0 self.settings.input.adf.setdefault("RemoveFragOrbitals", "") self.settings.input.adf.RemoveFragOrbitals += f""" {frag} {subspecies or 'A'} {nremove} SubEnd End """
[docs] def run(self): """ Run the ``ADFFragmentJob``. This involves setting up the calculations for each fragment as well as the parent job. It will also submit a calculation with the SCF iterations set to 0 in order to facilitate investigation of the field effects using PyOrb. """ # check if the user defined fragments for this job if len(self.child_jobs) == 0: log.warn("Fragments were not specified yet, trying to read them from the xyz file ...") # if they did not define the fragments, try to guess them using the xyz-file if not self.guess_fragments(): log.error("Please specify the fragments using ADFFragmentJob.add_fragment or specify them in the xyz file.") raise mol_str = " + ".join([formula.molecule(child._molecule) for child in self.child_jobs.values()]) log.flow(f"ADFFragmentJob [{mol_str}]", ["start"]) # obtain some system wide properties of the molecules charge = sum([child.settings.input.ams.System.charge or 0 for child in self.child_jobs.values()]) unrestricted = any([(child.settings.input.adf.Unrestricted or "no").lower() == "yes" for child in self.child_jobs.values()]) spinpol = sum([child.settings.input.adf.SpinPolarization or 0 for child in self.child_jobs.values()]) log.flow(f"Level: {self._functional}/{self._basis_set}") log.flow(f"Solvent: {self._solvent}") log.flow(f"Charge: {charge}", ["straight"]) log.flow(f"Unrestricted: {unrestricted}", ["straight"]) log.flow(f"Spin-Polarization: {spinpol}", ["straight"]) log.flow() # this job and all its children should have the same value for unrestricted [child.unrestricted(unrestricted) for child in self.child_jobs.values()] # propagate the post- and preambles to the child_jobs [child.add_preamble(preamble) for preamble in self._preambles for child in self.child_jobs.values()] [child.add_postamble(postamble) for postamble in self._postambles for child in self.child_jobs.values()] # we now update the child settings with the parent settings # this is because we have to propagate settings such as functionals, basis sets etc. sett = self.settings.as_plams_settings() # first create a plams settings object # same for the children child_setts = {name: child.settings.as_plams_settings() for name, child in self.child_jobs.items()} # update the children using the parent settings [child_sett.update(sett) for child_sett in child_setts.values()] [child_sett.input.adf.pop("RemoveFragOrbitals", None) for child_sett in child_setts.values()] [child_sett.input.adf.pop("RemoveAllFragVirtuals", None) for child_sett in child_setts.values()] # same for sbatch settings [child.sbatch(**self._sbatch) for child in self.child_jobs.values()] # now set the charge, spinpol, unrestricted for the parent self.charge(charge) self.spin_polarization(spinpol) self.unrestricted(unrestricted) if unrestricted: self.settings.input.adf.UnrestrictedFragments = "Yes" elstat_jobs = {} # now we are going to run each child job for i, (child_name, child) in enumerate(self.child_jobs.items(), start=1): # the child name will be prepended with SP showing that it is the singlepoint calculation child.name = f'frag_{child_name}' child.rundir = j(self.rundir, self.name) # # add the path to the child adf.rkf file as a dependency to the parent job self.settings.input.adf.fragments[child_name] = j(child.workdir, 'adf.rkf') # recast the plams.Settings object into a Result object as that is what run expects child.settings = results.Result(child_setts[child_name]) log.flow(f'Fragment ({i}/{len(self.child_jobs)}) {child_name} [{formula.molecule(child._molecule)}]', ['split']) log.flow(f'Charge: {child.settings.input.ams.System.charge}', ['straight', 'straight']) log.flow(f'Spin-Polarization: {child.settings.input.adf.SpinPolarization}', ['straight', 'straight']) log.flow(f'Work dir: {child.workdir}', ['straight', 'straight']) if child.can_skip(): log.flow(log.Emojis.warning + " Already ran, skipping", ["straight", "end"]) log.flow() else: log.flow(log.Emojis.good + ' Submitting', ['straight', 'end']) child.run() self.dependency(child) log.flow(f'SlurmID: {child.slurm_job_id}', ['straight', 'skip', 'end']) log.flow() if self.decompose_elstat: child_STOFIT = ADFJob(child) child_STOFIT.name += '_STOFIT' elstat_jobs[child_STOFIT.name] = child_STOFIT child_STOFIT.settings.input.adf.STOFIT = '' child_STOFIT.settings.input.adf.PRINT = 'SFOSiteEnergies Elstat' child_STOFIT.settings.input.adf.pop("NumericalQuality") child_STOFIT.settings.input.adf.BeckeGrid.Quality = "Excellent" log.flow(f'Fragment ({i}/{len(self.child_jobs)}) {child_name} [{formula.molecule(child._molecule)}] with STOFIT', ['split']) log.flow(f'Charge: {child_STOFIT.settings.input.ams.System.charge}', ['straight', 'straight']) log.flow(f'Spin-Polarization: {child_STOFIT.settings.input.adf.SpinPolarization}', ['straight', 'straight']) log.flow(f'Work dir: {child_STOFIT.workdir}', ['straight', 'straight']) if child_STOFIT.can_skip(): log.flow(log.Emojis.warning + ' Already ran, skipping', ['straight', 'end']) log.flow() else: log.flow(log.Emojis.good + ' Submitting', ['straight', 'end']) child_STOFIT.run() self.dependency(child_STOFIT) log.flow(f'SlurmID: {child_STOFIT.slurm_job_id}', ['straight', 'skip', 'end']) log.flow() child_NoElectrons = ADFJob(child) child_NoElectrons.name += '_NoElectrons' elstat_jobs[child_NoElectrons.name] = child_NoElectrons child_NoElectrons.settings.input.adf.STOFIT = '' child_NoElectrons.settings.input.adf.PRINT = 'SFOSiteEnergies Elstat' child_NoElectrons.charge(molecule.number_of_electrons(child_NoElectrons._molecule)) child_NoElectrons.spin_polarization(0) child_NoElectrons.settings.input.adf.pop("NumericalQuality") child_NoElectrons.settings.input.adf.BeckeGrid.Quality = "Excellent" log.flow(f'Fragment ({i}/{len(self.child_jobs)}) {child_name} [{formula.molecule(child._molecule)}] without Electrons', ['split']) log.flow(f'Charge: {child_NoElectrons.settings.input.ams.System.charge}', ['straight', 'straight']) log.flow(f'Spin-Polarization: {child_NoElectrons.settings.input.adf.SpinPolarization}', ['straight', 'straight']) log.flow(f'Work dir: {child_NoElectrons.workdir}', ['straight', 'straight']) if child_NoElectrons.can_skip(): log.flow(log.Emojis.warning + ' Already ran, skipping', ['straight', 'end']) log.flow() else: log.flow(log.Emojis.good + ' Submitting', ['straight', 'end']) child_NoElectrons.run() self.dependency(child_NoElectrons) log.flow(f'SlurmID: {child_NoElectrons.slurm_job_id}', ['straight', 'skip', 'end']) log.flow() # in the parent job the atoms should have the region and adf.f defined as options atom_lines = [] # for each atom we check which child it came from for atom in self._molecule: for child_name, child in self.child_jobs.items(): for childatom in child._molecule: # we check by looking at the symbol and coordinates of the atom if (atom.symbol, atom.x, atom.y, atom.z) == (childatom.symbol, childatom.x, childatom.y, childatom.z): # now write the symbol and coords as a string with the correct suffix atom_lines.append(f'\t\t{atom.symbol} {atom.x} {atom.y} {atom.z} region={child_name} adf.f={child_name}') old_name = self.name # write the atoms block as a string with new line characters self.settings.input.ams.system.atoms = ("\n" + "\n".join(atom_lines) + "\n\tEnd").expandtabs(4) # set the _molecule to None, otherwise it will overwrite the atoms block self._molecule = None # run this job self.rundir = j(self.rundir, old_name) self.name = 'complex' log.flow(log.Emojis.good + ' Submitting parent job', ['split']) super().run() log.flow(f"SlurmID: {self.slurm_job_id}", ["straight", "end"]) log.flow() # also do the calculation with SCF cycles set to 1 old_iters = self.settings.input.adf.SCF.Iterations or 300 self.SCF(iterations=0) # we must repopulate the sbatch settings for the new run [self._sbatch.pop(key, None) for key in ["D", "chdir", "J", "job_name", "o", "output"]] self.name = "complex_SCF0" log.flow(log.Emojis.good + " Submitting extra job with 0 SCF iterations", ["split"]) super().run() log.flow(f"SlurmID: {self.slurm_job_id}", ["straight", "end"]) log.flow() # also do the calculation with no electrons on the fragments if the user requested a elstat decomposition if self.decompose_elstat: frag_names = self.child_jobs.keys() # reset the SCF iterations self.SCF(iterations=old_iters) self.settings.input.adf.pop("NumericalQuality") self.settings.input.adf.BeckeGrid.Quality = "Excellent" [self._sbatch.pop(key, None) for key in ['D', 'chdir', 'J', 'job_name', 'o', 'output']] self.name = 'complex_STOFIT' # set the region and fragment files correctly atom_lines_ = [] for line in atom_lines: for frag in frag_names: line = line.replace(frag, f'{frag}_STOFIT') atom_lines_.append(line) self.settings.input.ams.system.atoms = ('\n' + '\n'.join(atom_lines_) + '\n\tEnd').expandtabs(4) # set the fragment references correctly self.settings.input.adf.pop('fragments') for frag_name in frag_names: self.settings.input.adf.fragments[frag_name + '_STOFIT'] = j(elstat_jobs['frag_' + frag_name + '_STOFIT'].workdir, 'adf.rkf') self.settings.input.adf.STOFIT = '' self.settings.input.adf.PRINT = 'SFOSiteEnergies Elstat' log.flow(log.Emojis.good + ' Submitting complex with STOFIT', ['split']) super().run() log.flow(f'SlurmID: {self.slurm_job_id}', ['straight', 'end']) log.flow() for frag in frag_names: # other_frags stores fragment names for fragments that keep their electrons other_frags = [frag_ for frag_ in frag_names if frag_ != frag] # we must repopulate the sbatch settings for the new run [self._sbatch.pop(key, None) for key in ['D', 'chdir', 'J', 'job_name', 'o', 'output']] self.name = f'complex_{frag}_NoElectrons' # set the region and fragment files correctly atom_lines_ = [] for line in atom_lines: line = line.replace(frag, f'{frag}_NoElectrons') for other_frag in other_frags: line = line.replace(other_frag, f'{other_frag}_STOFIT') atom_lines_.append(line) self.settings.input.ams.system.atoms = ('\n' + '\n'.join(atom_lines_) + '\n\tEnd').expandtabs(4) # set the fragment references correctly self.settings.input.adf.pop('fragments') self.settings.input.adf.fragments[frag + '_NoElectrons'] = j(elstat_jobs['frag_' + frag + '_NoElectrons'].workdir, 'adf.rkf') for other_frag in other_frags: self.settings.input.adf.fragments[other_frag + '_STOFIT'] = j(elstat_jobs['frag_' + other_frag + '_STOFIT'].workdir, 'adf.rkf') other_charge = sum([elstat_jobs['frag_' + other_frag + '_STOFIT'].settings.input.ams.System.charge for other_frag in other_frags]) total_charge = other_charge + (elstat_jobs['frag_' + frag + '_NoElectrons'].settings.input.ams.System.charge) self.charge(total_charge) other_spin_polarization = sum([elstat_jobs['frag_' + other_frag + '_STOFIT'].settings.input.adf.SpinPolarization for other_frag in other_frags]) total_spin_polarization = other_spin_polarization + (elstat_jobs['frag_' + frag + '_NoElectrons'].settings.input.adf.SpinPolarization) self.spin_polarization(total_spin_polarization) log.flow(log.Emojis.good + f' Submitting complex with 0 electrons in fragment {frag}', ['split']) super().run() log.flow(f'SlurmID: {self.slurm_job_id}', ['straight', 'end']) log.flow() log.flow(log.Emojis.finish + ' Done, bye!', ['startinv'])
[docs] class DensfJob(Job): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.settings = results.Result() self.rundir = "tmp" self.name = "densf" self.gridsize() self._mos = [] self._sfos = [] self.settings.ADFFile = None def __str__(self): return f"Densf({self.target}), running in {self.workdir}"
[docs] def gridsize(self, size="medium"): """ Set the size of the grid to be used by Densf. Args: size: either "coarse", "medium", "fine". Defaults to "medium". """ spell_check.check(size, ["coarse", "medium", "fine"], ignore_case=True) self.settings.grid = size
[docs] def orbital(self, orbital: "pyfmo.orbitals.sfo.SFO" or "pyfmo.orbitals.mo.MO"): # noqa: F821 """ Add a PyOrb orbital for Densf to calculate. """ import pyfmo if isinstance(orbital, (pyfmo.orbitals.sfo.SFO, pyfmo.orbitals2.objects.SFO)): self._sfos.append(orbital) elif isinstance(orbital, (pyfmo.orbitals.mo.MO, pyfmo.orbitals2.objects.MO)): self._mos.append(orbital) else: raise TCJobError(job_class=self.__class__.__name__, message=f"Unknown object {orbital} of type{type(orbital)}. It should be a pyfmo.orbitals.sfo.SFO or pyfmo.orbitals.mos.MO object.") # check if the ADFFile is the same for all added orbitals if self.settings.ADFFile is None: self.settings.ADFFile = orbital.kfpath elif self.settings.ADFFile != orbital.kfpath: raise TCJobError(job_class=self.__class__.__name__, message="RKF file that was previously set not the same as the one being set now. Please start a new job for each RKF file.")
def _setup_job(self): os.makedirs(self.workdir, exist_ok=True) # set up the input file. This should always contain calling of the densf program, as per the SCM documentation with open(self.inputfile_path, "w+") as inpf: inpf.write("$AMSBIN/densf << eor\n") inpf.write(f"ADFFile {self.settings.ADFFile}\n") inpf.write(f"GRID {self.settings.grid}\n") inpf.write("END\n") if len(self._mos) > 0: inpf.write("Orbitals SCF\n") for orb in self._mos: inpf.write(f' {orb.symmetry} {orb.index_in_symlabel + 1}\n') inpf.write('END\n') if len(self._sfos) > 0: inpf.write("Orbitals SFO\n") for orb in self._sfos: inpf.write(f" {orb.symmetry} {orb.index}\n") inpf.write("END\n") # cuboutput prefix is always the original run directory containing the adf.rkf file and includes the grid size inpf.write(f"CUBOUTPUT {os.path.split(self.settings.ADFFile)[0]}/{self.settings.grid}\n") inpf.write("eor\n") # the runfile should simply execute the input file. with open(self.runfile_path, "w+") as runf: runf.write("#!/bin/sh\n\n") runf.write("\n".join(self._preambles) + "\n\n") runf.write(f"sh {self.inputfile_path}\n") runf.write("\n".join(self._postambles)) return True @property def output_cub_paths(self): """ The output cube file paths that will be/were calculated by this job. """ paths = [] cuboutput = f"{os.path.split(self.settings.ADFFile)[0]}/{self.settings.grid}" for mo in self._mos: spin_part = '' if mo.spin == 'AB' else f'_{mo.spin}' paths.append(f'{cuboutput}%SCF_{mo.symmetry}{spin_part}%{mo.index_in_symlabel + 1}.cub') for sfo in self._sfos: spin_part = '' if sfo.spin == 'AB' else f'_{sfo.spin}' paths.append(f'{cuboutput}%SFO_{sfo.symmetry}{spin_part}%{sfo.index}.cub') return paths
[docs] def can_skip(self): return all(os.path.exists(path) for path in self.output_cub_paths)
if __name__ == "__main__": # import pyfmo # orbs = pyfmo.orbitals.Orbitals('/Users/yumanhordijk/PhD/MM2024/calculations/IRC/pi_beta_trans_TS1/pi_beta/pi_beta_trans/complex.00039/adf.rkf') # with DensfJob() as job: # job.orbital(orbs.sfos['frag1(HOMO)']) with ADFFragmentJob() as job: ...