import os
from scm import plams
from tcutility import data, formula, log, molecule, results, spell_check
from tcutility.errors import TCCompDetailsError, TCJobError
from tcutility.job.ams import AMSJob
from tcutility.job.generic import Job
j = os.path.join
[docs]
class ADFJob(AMSJob):
def __init__(self, *args, **kwargs):
self.settings = results.Result()
self._functional = None
self._core = None
self.solvent("vacuum")
self.basis_set("TZ2P")
self.quality("Good")
self.SCF(thresh=1e-8)
self.single_point()
# by default print the fock matrix
self.settings.input.adf.print = "SFOSiteEnergies"
super().__init__(*args, **kwargs)
def __str__(self):
return f"{self._task}({self._functional}/{self._basis_set}), running in {os.path.join(os.path.abspath(self.rundir), self.name)}"
[docs]
def basis_set(self, typ: str = "TZ2P", core: str = "None"):
"""
Set the basis-set type and frozen core approximation for this calculation.
Args:
typ: the type of basis-set to use. Default is TZ2P.
core: the size of the frozen core approximation. Default is None.
Raises:
ValueError: if the basis-set name or core is incorrect.
.. note:: If the selected functional is the r2SCAN-3c functional, then the basis-set will be set to mTZ2P.
.. seealso::
:mod:`tcutility.data.basis_sets` for an overview of the available basis-sets in ADF.
"""
spell_check.check(typ, data.basis_sets.available_basis_sets["ADF"], ignore_case=True)
spell_check.check(core, ["None", "Small", "Large"], ignore_case=True)
if self._functional == "r2SCAN-3c" and typ != "mTZ2P":
log.warn(f"Basis set {typ} is not allowed with r2SCAN-3c, switching to mTZ2P.")
typ = "mTZ2P"
self._basis_set = typ
self._core = core
self.settings.input.adf.basis.type = typ
self.settings.input.adf.basis.core = core
[docs]
def spin_polarization(self, val: int):
"""
Set the spin-polarization of the system. If the value is not zero the calculation will also be unrestricted.
"""
self.settings.input.adf.SpinPolarization = val
if val != 0:
self.settings.input.adf.Unrestricted = "Yes"
[docs]
def multiplicity(self, val: int):
"""
Set the multiplicity of the system. If the value is not one the calculation will also be unrestricted.
We use the following values:
1) singlet
2) doublet
3) triplet
4) ...
The multiplicity is equal to 2*S+1 for spin-polarization S.
"""
self.settings.input.adf.SpinPolarization = (val - 1) // 2
if val != 1:
self.settings.input.adf.Unrestricted = "Yes"
[docs]
def unrestricted(self, val: bool):
"""
Whether the calculation should be unrestricted.
"""
self.settings.input.adf.Unrestricted = "Yes" if val else "No"
[docs]
def occupations(self, strategy: str):
"""
Set the orbital filling strategy for ADF.
Args:
strategy: the name of the filling strategy. This can contain multiple of the options allowed.
.. seealso::
The SCM documentation can be found at https://www.scm.com/doc/ADF/Input/Electronic_Configuration.html#aufbau-smearing-freezing
"""
self.settings.input.adf.Occupations = strategy
[docs]
def quality(self, val: str = "Good"):
"""
Set the numerical quality of the calculation.
Args:
val: the numerical quality value to set to. This is the same as the ones used in the ADF GUI. Defaults to Good.
Raises:
ValueError: if the quality value is incorrect.
"""
spell_check.check(val, ["Basic", "Normal", "Good", "VeryGood", "Excellent"], ignore_case=True)
self.settings.input.adf.NumericalQuality = val
[docs]
def SCF_convergence(self, thresh: float = 1e-8):
"""
Set the SCF convergence criteria for the job.
Args:
thresh: the convergence criteria for the SCF procedure. Defaults to 1e-8.
.. deprecated:: 0.9.2
Please use :meth:`ADFJob.SCF` instead of this method.
"""
log.warn("This method has been deprecated, please use ADFJob.SCF instead.")
self.SCF(thresh=thresh)
[docs]
def SCF(self, iterations: int = 300, thresh: float = 1e-8):
"""
Set the SCF settings for this calculations.
Args:
iterations: number of iterations to perform for this calculation. Defaults to 300.
thresh: the convergence criteria for the SCF procedure. Defaults to 1e-8.
.. note::
When setting the number of iterations to 0 or 1 the ``AlwaysClaimSuccess`` key will also be set to ``Yes``.
This is to prevent the job from being flagged as a failure when reading it using :mod:`tcutility.results`.
"""
self.settings.input.adf.SCF.Iterations = iterations
self.settings.input.adf.SCF.Converge = thresh
if iterations in [0, 1]:
self.settings.input.ams.EngineDebugging.AlwaysClaimSuccess = "Yes"
[docs]
def functional(self, funtional_name: str, dispersion: str = None):
"""
Set the functional to be used by the calculation. This also sets the dispersion if it is specified in the functional name.
Args:
funtional_name: the name of the functional. The value can be the same as the ones used in the ADF GUI.
dispersion: dispersion setting to use with the functional. This is used when you want to use a functional from LibXC.
Raises:
ValueError: if the functional name is not recognized.
.. note:: Setting the functional to r2SCAN-3c will automatically set the basis-set to mTZ2P.
.. seealso::
:mod:`tcutility.data.functionals` for an overview of the available functionals in ADF.
"""
# before adding the new functional we should clear any previous functional settings
self.settings.input.adf.pop("XC", None)
functional = funtional_name.strip()
functional = functional.replace("-D4(EEQ)", "-D4") # D4(EEQ) and D4 are the same, unlike D3 and D3(BJ)
self._functional = functional
if functional == "r2SCAN-3c" and self._basis_set != "mTZ2P":
log.info(f"Switching basis set from {self._basis_set} to mTZ2P for r2SCAN-3c.")
self.basis_set("mTZ2P")
if functional == "SSB-D":
raise TCCompDetailsError(section="Functional", message='There are two functionals called SSB-D, please use "GGA:SSB-D" or "MetaGGA:SSB-D".')
if not data.functionals.get(functional):
raise TCCompDetailsError(section="Functional", message=f"XC-functional {functional} not found.")
else:
func = data.functionals.get(functional)
self.settings.input.adf.update(func.adf_settings)
[docs]
def relativity(self, level: str = "Scalar"):
"""
Set the treatment of relativistic effects for this calculation.
Args:
level: the level to set. Can be the same as the values in the ADF GUI and documentation. By default it is set to Scalar.
Raises:
ValueError: if the relativistic correction level is not correct.
"""
spell_check.check(level, ["Scalar", "None", "Spin-Orbit"], ignore_case=True)
self.settings.input.adf.relativity.level = level
[docs]
def solvent(self, name: str = None, eps: float = None, rad: float = None, use_klamt: bool = False):
"""
Model solvation using COSMO for this calculation.
Args:
name: the name of the solvent you want to use. Please see the ADF manual for a list of available solvents.
eps: the dielectric constant of your solvent. You can use this in place of the solvent name if you need more control over COSMO.
rad: the radius of the solvent molecules. You can use this in place of the solvent name if you need more control over COSMO.
use_klamt: whether to use the klamt atomic radii. This is usually used when you have charged species (?).
Raises:
ValueError: if the solvent name is given, but incorrect.
.. seealso::
:mod:`tcutility.data.cosmo` for an overview of the available solvent names and formulas.
"""
if name:
spell_check.check(name, data.cosmo.available_solvents, ignore_case=True, insertion_cost=0.3)
self._solvent = name
else:
self._solvent = f"COSMO(eps={eps} rad={rad})"
if name == "vacuum":
self.settings.input.adf.pop("Solvation", None)
return
self.settings.input.adf.Solvation.Surf = "Delley"
solv_string = ""
if name:
solv_string += f"name={name} "
else:
self.settings.input.adf.Solvation.Solv = f"eps={eps} rad={rad} "
if use_klamt:
solv_string += "cav0=0.0 cav1=0.0"
else:
solv_string += "cav0=0.0 cav1=0.0067639"
self.settings.input.adf.Solvation.Solv = solv_string
self.settings.input.adf.Solvation.Charged = "method=CONJ corr"
self.settings.input.adf.Solvation["C-Mat"] = "POT"
self.settings.input.adf.Solvation.SCF = "VAR ALL"
self.settings.input.adf.Solvation.CSMRSP = None
if use_klamt:
radii = {"H": 1.30, "C": 2.00, "N": 1.83, "O": 1.72, "F": 1.72, "Si": 2.48, "P": 2.13, "S": 2.16, "Cl": 2.05, "Br": 2.16, "I": 2.32}
self.settings.input.adf.solvation.radii = radii
[docs]
def symmetry(self, group: str):
"""
Specify the symmetry group to be used by ADF.
Args:
group: the symmetry group to be used.
"""
self.settings.input.adf.Symmetry = group
def _check_job(self):
# if we have spinpolarization we do not want to use DFTB to calculate the initial hessian
if self.settings.input.adf.SpinPolarization != 0 and self.settings.input.ams.GeometryOptimization.InitialHessian.Type == "CalculateWithFastEngine":
# we simply remove it from the geometryoptimization block
self.settings.input.ams.GeometryOptimization.pop("InitialHessian", None)
[docs]
class ADFFragmentJob(ADFJob):
def __init__(self, *args, **kwargs):
self.decompose_elstat = kwargs.pop('decompose_elstat', False)
self.child_jobs = {}
# self.child_jobs_no_electrons = {}
super().__init__(*args, **kwargs)
self.name = "EDA"
[docs]
def add_fragment(self, mol: plams.Molecule, name: str = None, charge: int = 0, spin_polarization: int = 0):
"""
Add a fragment to this job. Optionally give the name, charge and spin-polarization of the fragment as well.
Args:
mol: the molecule corresponding to the fragment.
name: the name of the fragment. By default it will be set to ``fragment{N+1}`` if ``N`` is the number of fragments already present.
charge: the charge of the fragment to be added.
spin_polarization: the spin-polarization of the fragment to be added.
"""
# in case of giving fragments as indices we dont want to add the fragment to the molecule later
add_frag_to_mol = True
# we can be given a list of atoms
if isinstance(mol, list) and isinstance(mol[0], plams.Atom):
mol_ = plams.Molecule()
[mol_.add_atom(atom) for atom in mol]
mol = mol_
# or a list of integers
if isinstance(mol, list) and isinstance(mol[0], int):
if not self._molecule:
log.error(f"Trying to add fragment based on atom indices, but main job does not have a molecule yet. Call the {self.__class__.__name__}.molecule method to add one.")
return
mol_ = plams.Molecule()
[mol_.add_atom(self._molecule[i]) for i in mol]
mol = mol_.copy()
add_frag_to_mol = False
# check if the atoms in the new fragment are already present in the other fragments.
# if it is we should raise an error
for child in self.child_jobs.values():
if any((atom.symbol, atom.coords) == (myatom.symbol, myatom.coords) for atom in child._molecule for myatom in mol):
raise TCJobError(job_class=self.__class__.__name__, message="The atoms in the new fragment are already present in the other fragments.")
name = name or f"fragment{len(self.child_jobs) + 1}"
self.child_jobs[name] = ADFJob(test_mode=self.test_mode)
self.child_jobs[name].molecule(mol)
self.child_jobs[name].charge(charge)
self.child_jobs[name].spin_polarization(spin_polarization)
setattr(self, name, self.child_jobs[name])
if not add_frag_to_mol:
return
if self._molecule is None:
self._molecule = self.child_jobs[name]._molecule.copy()
else:
for atom in self.child_jobs[name]._molecule.copy():
if any((atom.symbol, atom.coords) == (myatom.symbol, myatom.coords) for myatom in self._molecule):
continue
self._molecule.add_atom(atom)
[docs]
def guess_fragments(self):
"""
Guess what the fragments are based on data stored in the molecule provided for this job.
This will automatically set the correct fragment molecules, names, charges and spin-polarizations.
.. seealso::
| :func:`tcutility.molecule.guess_fragments` for an explanation of the xyz-file format required to guess the fragments.
| :meth:`ADFFragmentJob.add_fragment` to manually add a fragment.
.. note::
This function will be automatically called if there were no fragments given to this calculation.
"""
frags = molecule.guess_fragments(self._molecule)
if frags is None:
log.error("Could not load fragment data for the molecule.")
return False
for fragment_name, fragment in frags.items():
charge = fragment.flags.get("charge", 0)
spin_polarization = fragment.flags.get("spinpol", 0)
self.add_fragment(fragment, fragment_name, charge=charge, spin_polarization=spin_polarization)
return True
[docs]
def remove_virtuals(self, frag=None, subspecies=None, nremove=None):
"""
Remove virtual orbitals from the fragments.
Args:
frag: the fragment to remove virtuals from. If set to ``None`` we remove all virtual orbitals of all fragments.
subspecies: the symmetry subspecies to remove virtuals from. If set to ``None`` we assume we have ``A`` subspecies.
nremove: the number of virtuals to remove. If set to ``None`` we will guess the number of virtuals based on the basis-set chosen.
"""
if frag is None:
self.settings.input.adf.RemoveAllFragVirtuals = "Yes"
return
# if nremove is not given we will get it from the atoms in the fragment
if nremove is None:
# guess the virtual numbers only works for non-frozen-core calculations
if self._core.lower() != "none":
raise TCJobError(job_class=self.__class__.__name__, message="Cannot guess number of virtual orbitals for calculations with a frozen core.")
# the basis-set has to be present in the prepared data
if self._basis_set.lower() not in [bs.lower() for bs in data.basis_sets._number_of_orbitals.keys()]:
raise TCJobError(job_class=self.__class__.__name__, message=f"Cannot guess number of virtual orbitals for calculations with the {self._basis_set} basis-set.")
# sum up the number of virtuals per atom in the fragment
nremove = 0
for atom in self.child_jobs[frag]._molecule:
nremove += data.basis_sets.number_of_virtuals(atom.symbol, self._basis_set)
# positive charge adds a virtual and negative removes a virtual
nremove += self.child_jobs[frag].settings.input.ams.System.charge or 0
self.settings.input.adf.setdefault("RemoveFragOrbitals", "")
self.settings.input.adf.RemoveFragOrbitals += f"""
{frag}
{subspecies or 'A'} {nremove}
SubEnd
End
"""
[docs]
def run(self):
"""
Run the ``ADFFragmentJob``. This involves setting up the calculations for each fragment as well as the parent job.
It will also submit a calculation with the SCF iterations set to 0 in order to facilitate investigation of the field effects using PyOrb.
"""
# check if the user defined fragments for this job
if len(self.child_jobs) == 0:
log.warn("Fragments were not specified yet, trying to read them from the xyz file ...")
# if they did not define the fragments, try to guess them using the xyz-file
if not self.guess_fragments():
log.error("Please specify the fragments using ADFFragmentJob.add_fragment or specify them in the xyz file.")
raise
mol_str = " + ".join([formula.molecule(child._molecule) for child in self.child_jobs.values()])
log.flow(f"ADFFragmentJob [{mol_str}]", ["start"])
# obtain some system wide properties of the molecules
charge = sum([child.settings.input.ams.System.charge or 0 for child in self.child_jobs.values()])
unrestricted = any([(child.settings.input.adf.Unrestricted or "no").lower() == "yes" for child in self.child_jobs.values()])
spinpol = sum([child.settings.input.adf.SpinPolarization or 0 for child in self.child_jobs.values()])
log.flow(f"Level: {self._functional}/{self._basis_set}")
log.flow(f"Solvent: {self._solvent}")
log.flow(f"Charge: {charge}", ["straight"])
log.flow(f"Unrestricted: {unrestricted}", ["straight"])
log.flow(f"Spin-Polarization: {spinpol}", ["straight"])
log.flow()
# this job and all its children should have the same value for unrestricted
[child.unrestricted(unrestricted) for child in self.child_jobs.values()]
# propagate the post- and preambles to the child_jobs
[child.add_preamble(preamble) for preamble in self._preambles for child in self.child_jobs.values()]
[child.add_postamble(postamble) for postamble in self._postambles for child in self.child_jobs.values()]
# we now update the child settings with the parent settings
# this is because we have to propagate settings such as functionals, basis sets etc.
sett = self.settings.as_plams_settings() # first create a plams settings object
# same for the children
child_setts = {name: child.settings.as_plams_settings() for name, child in self.child_jobs.items()}
# update the children using the parent settings
[child_sett.update(sett) for child_sett in child_setts.values()]
[child_sett.input.adf.pop("RemoveFragOrbitals", None) for child_sett in child_setts.values()]
[child_sett.input.adf.pop("RemoveAllFragVirtuals", None) for child_sett in child_setts.values()]
# same for sbatch settings
[child.sbatch(**self._sbatch) for child in self.child_jobs.values()]
# now set the charge, spinpol, unrestricted for the parent
self.charge(charge)
self.spin_polarization(spinpol)
self.unrestricted(unrestricted)
if unrestricted:
self.settings.input.adf.UnrestrictedFragments = "Yes"
elstat_jobs = {}
# now we are going to run each child job
for i, (child_name, child) in enumerate(self.child_jobs.items(), start=1):
# the child name will be prepended with SP showing that it is the singlepoint calculation
child.name = f'frag_{child_name}'
child.rundir = j(self.rundir, self.name)
# # add the path to the child adf.rkf file as a dependency to the parent job
self.settings.input.adf.fragments[child_name] = j(child.workdir, 'adf.rkf')
# recast the plams.Settings object into a Result object as that is what run expects
child.settings = results.Result(child_setts[child_name])
log.flow(f'Fragment ({i}/{len(self.child_jobs)}) {child_name} [{formula.molecule(child._molecule)}]', ['split'])
log.flow(f'Charge: {child.settings.input.ams.System.charge}', ['straight', 'straight'])
log.flow(f'Spin-Polarization: {child.settings.input.adf.SpinPolarization}', ['straight', 'straight'])
log.flow(f'Work dir: {child.workdir}', ['straight', 'straight'])
if child.can_skip():
log.flow(log.Emojis.warning + " Already ran, skipping", ["straight", "end"])
log.flow()
else:
log.flow(log.Emojis.good + ' Submitting', ['straight', 'end'])
child.run()
self.dependency(child)
log.flow(f'SlurmID: {child.slurm_job_id}', ['straight', 'skip', 'end'])
log.flow()
if self.decompose_elstat:
child_STOFIT = ADFJob(child)
child_STOFIT.name += '_STOFIT'
elstat_jobs[child_STOFIT.name] = child_STOFIT
child_STOFIT.settings.input.adf.STOFIT = ''
child_STOFIT.settings.input.adf.PRINT = 'SFOSiteEnergies Elstat'
child_STOFIT.settings.input.adf.pop("NumericalQuality")
child_STOFIT.settings.input.adf.BeckeGrid.Quality = "Excellent"
log.flow(f'Fragment ({i}/{len(self.child_jobs)}) {child_name} [{formula.molecule(child._molecule)}] with STOFIT', ['split'])
log.flow(f'Charge: {child_STOFIT.settings.input.ams.System.charge}', ['straight', 'straight'])
log.flow(f'Spin-Polarization: {child_STOFIT.settings.input.adf.SpinPolarization}', ['straight', 'straight'])
log.flow(f'Work dir: {child_STOFIT.workdir}', ['straight', 'straight'])
if child_STOFIT.can_skip():
log.flow(log.Emojis.warning + ' Already ran, skipping', ['straight', 'end'])
log.flow()
else:
log.flow(log.Emojis.good + ' Submitting', ['straight', 'end'])
child_STOFIT.run()
self.dependency(child_STOFIT)
log.flow(f'SlurmID: {child_STOFIT.slurm_job_id}', ['straight', 'skip', 'end'])
log.flow()
child_NoElectrons = ADFJob(child)
child_NoElectrons.name += '_NoElectrons'
elstat_jobs[child_NoElectrons.name] = child_NoElectrons
child_NoElectrons.settings.input.adf.STOFIT = ''
child_NoElectrons.settings.input.adf.PRINT = 'SFOSiteEnergies Elstat'
child_NoElectrons.charge(molecule.number_of_electrons(child_NoElectrons._molecule))
child_NoElectrons.spin_polarization(0)
child_NoElectrons.settings.input.adf.pop("NumericalQuality")
child_NoElectrons.settings.input.adf.BeckeGrid.Quality = "Excellent"
log.flow(f'Fragment ({i}/{len(self.child_jobs)}) {child_name} [{formula.molecule(child._molecule)}] without Electrons', ['split'])
log.flow(f'Charge: {child_NoElectrons.settings.input.ams.System.charge}', ['straight', 'straight'])
log.flow(f'Spin-Polarization: {child_NoElectrons.settings.input.adf.SpinPolarization}', ['straight', 'straight'])
log.flow(f'Work dir: {child_NoElectrons.workdir}', ['straight', 'straight'])
if child_NoElectrons.can_skip():
log.flow(log.Emojis.warning + ' Already ran, skipping', ['straight', 'end'])
log.flow()
else:
log.flow(log.Emojis.good + ' Submitting', ['straight', 'end'])
child_NoElectrons.run()
self.dependency(child_NoElectrons)
log.flow(f'SlurmID: {child_NoElectrons.slurm_job_id}', ['straight', 'skip', 'end'])
log.flow()
# in the parent job the atoms should have the region and adf.f defined as options
atom_lines = []
# for each atom we check which child it came from
for atom in self._molecule:
for child_name, child in self.child_jobs.items():
for childatom in child._molecule:
# we check by looking at the symbol and coordinates of the atom
if (atom.symbol, atom.x, atom.y, atom.z) == (childatom.symbol, childatom.x, childatom.y, childatom.z):
# now write the symbol and coords as a string with the correct suffix
atom_lines.append(f'\t\t{atom.symbol} {atom.x} {atom.y} {atom.z} region={child_name} adf.f={child_name}')
old_name = self.name
# write the atoms block as a string with new line characters
self.settings.input.ams.system.atoms = ("\n" + "\n".join(atom_lines) + "\n\tEnd").expandtabs(4)
# set the _molecule to None, otherwise it will overwrite the atoms block
self._molecule = None
# run this job
self.rundir = j(self.rundir, old_name)
self.name = 'complex'
log.flow(log.Emojis.good + ' Submitting parent job', ['split'])
super().run()
log.flow(f"SlurmID: {self.slurm_job_id}", ["straight", "end"])
log.flow()
# also do the calculation with SCF cycles set to 1
old_iters = self.settings.input.adf.SCF.Iterations or 300
self.SCF(iterations=0)
# we must repopulate the sbatch settings for the new run
[self._sbatch.pop(key, None) for key in ["D", "chdir", "J", "job_name", "o", "output"]]
self.name = "complex_SCF0"
log.flow(log.Emojis.good + " Submitting extra job with 0 SCF iterations", ["split"])
super().run()
log.flow(f"SlurmID: {self.slurm_job_id}", ["straight", "end"])
log.flow()
# also do the calculation with no electrons on the fragments if the user requested a elstat decomposition
if self.decompose_elstat:
frag_names = self.child_jobs.keys()
# reset the SCF iterations
self.SCF(iterations=old_iters)
self.settings.input.adf.pop("NumericalQuality")
self.settings.input.adf.BeckeGrid.Quality = "Excellent"
[self._sbatch.pop(key, None) for key in ['D', 'chdir', 'J', 'job_name', 'o', 'output']]
self.name = 'complex_STOFIT'
# set the region and fragment files correctly
atom_lines_ = []
for line in atom_lines:
for frag in frag_names:
line = line.replace(frag, f'{frag}_STOFIT')
atom_lines_.append(line)
self.settings.input.ams.system.atoms = ('\n' + '\n'.join(atom_lines_) + '\n\tEnd').expandtabs(4)
# set the fragment references correctly
self.settings.input.adf.pop('fragments')
for frag_name in frag_names:
self.settings.input.adf.fragments[frag_name + '_STOFIT'] = j(elstat_jobs['frag_' + frag_name + '_STOFIT'].workdir, 'adf.rkf')
self.settings.input.adf.STOFIT = ''
self.settings.input.adf.PRINT = 'SFOSiteEnergies Elstat'
log.flow(log.Emojis.good + ' Submitting complex with STOFIT', ['split'])
super().run()
log.flow(f'SlurmID: {self.slurm_job_id}', ['straight', 'end'])
log.flow()
for frag in frag_names:
# other_frags stores fragment names for fragments that keep their electrons
other_frags = [frag_ for frag_ in frag_names if frag_ != frag]
# we must repopulate the sbatch settings for the new run
[self._sbatch.pop(key, None) for key in ['D', 'chdir', 'J', 'job_name', 'o', 'output']]
self.name = f'complex_{frag}_NoElectrons'
# set the region and fragment files correctly
atom_lines_ = []
for line in atom_lines:
line = line.replace(frag, f'{frag}_NoElectrons')
for other_frag in other_frags:
line = line.replace(other_frag, f'{other_frag}_STOFIT')
atom_lines_.append(line)
self.settings.input.ams.system.atoms = ('\n' + '\n'.join(atom_lines_) + '\n\tEnd').expandtabs(4)
# set the fragment references correctly
self.settings.input.adf.pop('fragments')
self.settings.input.adf.fragments[frag + '_NoElectrons'] = j(elstat_jobs['frag_' + frag + '_NoElectrons'].workdir, 'adf.rkf')
for other_frag in other_frags:
self.settings.input.adf.fragments[other_frag + '_STOFIT'] = j(elstat_jobs['frag_' + other_frag + '_STOFIT'].workdir, 'adf.rkf')
other_charge = sum([elstat_jobs['frag_' + other_frag + '_STOFIT'].settings.input.ams.System.charge for other_frag in other_frags])
total_charge = other_charge + (elstat_jobs['frag_' + frag + '_NoElectrons'].settings.input.ams.System.charge)
self.charge(total_charge)
other_spin_polarization = sum([elstat_jobs['frag_' + other_frag + '_STOFIT'].settings.input.adf.SpinPolarization for other_frag in other_frags])
total_spin_polarization = other_spin_polarization + (elstat_jobs['frag_' + frag + '_NoElectrons'].settings.input.adf.SpinPolarization)
self.spin_polarization(total_spin_polarization)
log.flow(log.Emojis.good + f' Submitting complex with 0 electrons in fragment {frag}', ['split'])
super().run()
log.flow(f'SlurmID: {self.slurm_job_id}', ['straight', 'end'])
log.flow()
log.flow(log.Emojis.finish + ' Done, bye!', ['startinv'])
[docs]
class DensfJob(Job):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.settings = results.Result()
self.rundir = "tmp"
self.name = "densf"
self.gridsize()
self._mos = []
self._sfos = []
self.settings.ADFFile = None
def __str__(self):
return f"Densf({self.target}), running in {self.workdir}"
[docs]
def gridsize(self, size="medium"):
"""
Set the size of the grid to be used by Densf.
Args:
size: either "coarse", "medium", "fine". Defaults to "medium".
"""
spell_check.check(size, ["coarse", "medium", "fine"], ignore_case=True)
self.settings.grid = size
[docs]
def orbital(self, orbital: "pyfmo.orbitals.sfo.SFO" or "pyfmo.orbitals.mo.MO"): # noqa: F821
"""
Add a PyOrb orbital for Densf to calculate.
"""
import pyfmo
if isinstance(orbital, (pyfmo.orbitals.sfo.SFO, pyfmo.orbitals2.objects.SFO)):
self._sfos.append(orbital)
elif isinstance(orbital, (pyfmo.orbitals.mo.MO, pyfmo.orbitals2.objects.MO)):
self._mos.append(orbital)
else:
raise TCJobError(job_class=self.__class__.__name__, message=f"Unknown object {orbital} of type{type(orbital)}. It should be a pyfmo.orbitals.sfo.SFO or pyfmo.orbitals.mos.MO object.")
# check if the ADFFile is the same for all added orbitals
if self.settings.ADFFile is None:
self.settings.ADFFile = orbital.kfpath
elif self.settings.ADFFile != orbital.kfpath:
raise TCJobError(job_class=self.__class__.__name__, message="RKF file that was previously set not the same as the one being set now. Please start a new job for each RKF file.")
def _setup_job(self):
os.makedirs(self.workdir, exist_ok=True)
# set up the input file. This should always contain calling of the densf program, as per the SCM documentation
with open(self.inputfile_path, "w+") as inpf:
inpf.write("$AMSBIN/densf << eor\n")
inpf.write(f"ADFFile {self.settings.ADFFile}\n")
inpf.write(f"GRID {self.settings.grid}\n")
inpf.write("END\n")
if len(self._mos) > 0:
inpf.write("Orbitals SCF\n")
for orb in self._mos:
inpf.write(f' {orb.symmetry} {orb.index_in_symlabel + 1}\n')
inpf.write('END\n')
if len(self._sfos) > 0:
inpf.write("Orbitals SFO\n")
for orb in self._sfos:
inpf.write(f" {orb.symmetry} {orb.index}\n")
inpf.write("END\n")
# cuboutput prefix is always the original run directory containing the adf.rkf file and includes the grid size
inpf.write(f"CUBOUTPUT {os.path.split(self.settings.ADFFile)[0]}/{self.settings.grid}\n")
inpf.write("eor\n")
# the runfile should simply execute the input file.
with open(self.runfile_path, "w+") as runf:
runf.write("#!/bin/sh\n\n")
runf.write("\n".join(self._preambles) + "\n\n")
runf.write(f"sh {self.inputfile_path}\n")
runf.write("\n".join(self._postambles))
return True
@property
def output_cub_paths(self):
"""
The output cube file paths that will be/were calculated by this job.
"""
paths = []
cuboutput = f"{os.path.split(self.settings.ADFFile)[0]}/{self.settings.grid}"
for mo in self._mos:
spin_part = '' if mo.spin == 'AB' else f'_{mo.spin}'
paths.append(f'{cuboutput}%SCF_{mo.symmetry}{spin_part}%{mo.index_in_symlabel + 1}.cub')
for sfo in self._sfos:
spin_part = '' if sfo.spin == 'AB' else f'_{sfo.spin}'
paths.append(f'{cuboutput}%SFO_{sfo.symmetry}{spin_part}%{sfo.index}.cub')
return paths
[docs]
def can_skip(self):
return all(os.path.exists(path) for path in self.output_cub_paths)
if __name__ == "__main__":
# import pyfmo
# orbs = pyfmo.orbitals.Orbitals('/Users/yumanhordijk/PhD/MM2024/calculations/IRC/pi_beta_trans_TS1/pi_beta/pi_beta_trans/complex.00039/adf.rkf')
# with DensfJob() as job:
# job.orbital(orbs.sfos['frag1(HOMO)'])
with ADFFragmentJob() as job:
...