import os
import shutil
import stat
import subprocess as sp
from typing import List, Union
import dictfunc
from scm import plams
from tcutility import log, molecule, results, slurm
from tcutility.environment import OSName, get_os_name
from tcutility.errors import TCJobError
j = os.path.join
def _python_path():
"""
Sometimes it is necessary to have the Python path as some environments don't have its path.
This function attempts to find the Python path and returns it.
"""
python = sp.run("which python", shell=True, capture_output=True).stdout.decode().strip()
if python == "" or not os.path.exists(python):
python = sp.run("which python3", shell=True, capture_output=True).stdout.decode().strip()
# we default to the python executable
if python == "" or not os.path.exists(python):
python = "python"
return python
[docs]
class Job:
"""This is the base Job class used to build more advanced classes such as :class:`AMSJob <tcutility.job.ams.AMSJob>` and :class:`ORCAJob <tcutility.job.orca.ORCAJob>`.
The base class contains an empty :class:`Result <tcutility.results.result.Result>` object that holds the settings.
It also provides :meth:`__enter__` and :meth:`__exit__` methods to make use of context manager syntax.
All class methods are in principle safe to overwrite, but the :meth:`_setup_job` method **must** be overwritten.
Args:
test_mode: whether to enable the testing mode. If enabled, the job will be setup like normally, but the running step is skipped. This is useful if you want to know what the job settings look like before running the real calculations.
overwrite: whether to overwrite a previously run job in the same working directory.
wait_for_finish: whether to wait for this job to finish running before continuing your runscript.
delete_on_finish: whether to remove the workdir for this job after it is finished running.
"""
def __init__(
self, *base_jobs: List["Job"], test_mode: bool = None, overwrite: bool = None, wait_for_finish: bool = None, delete_on_finish: bool = None, delete_on_fail: bool = None, use_slurm: bool = True
):
self._sbatch = results.Result()
self._molecule = None
self._molecule_path = None
self.slurm_job_id = None
self.name = "calc"
self.rundir = "tmp"
self._preambles = []
self._postambles = []
self._postscripts = []
self.test_mode = test_mode
self.overwrite = overwrite
self.wait_for_finish = wait_for_finish
self.delete_on_finish = delete_on_finish
self.use_slurm = use_slurm
# update this job with base_jobs
for base_job in base_jobs:
self.__dict__.update(base_job.copy().__dict__)
self.test_mode = self.test_mode if test_mode is None else test_mode
self.overwrite = self.overwrite if overwrite is None else overwrite
self.wait_for_finish = self.wait_for_finish if wait_for_finish is None else wait_for_finish
self.delete_on_finish = self.delete_on_finish if delete_on_finish is None else delete_on_finish
self.delete_on_fail = delete_on_fail if delete_on_fail is None else delete_on_fail
self.use_slurm = self.use_slurm if use_slurm is None else use_slurm
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
if exc_type:
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
log.error(f'Job set-up failed with exception: {exc_type.__name__}({exc_value}) in File "{fname}", line {exc_tb.tb_lineno}.')
return True
self.run()
[docs]
def can_skip(self):
"""
Check whether the job can be skipped. We check this by loading the calculation and checking if the job status was fatal.
Fatal in this case means that the job failed, canceled or could not be found. In those cases we want to run the job.
.. note::
This method works for the :class:`ADFJob <tcutility.job.adf.ADFJob>`, :class:`ADFFragmentJob <tcutility.job.adf.ADFFragmentJob>`,
:class:`DFTBJob <tcutility.job.dftb.DFTBJob>` and :class:`ORCAJob <tcutility.job.orca.ORCAJob>` objects, but not
yet for :class:`CRESTJob <tcutility.job.crest.CRESTJob>` and :class:`QCGJob <tcutility.job.crest.QCGJob>`. For the latter objects the job will always be rerun.
This will be fixed in a later version of TCutility.
"""
res = results.read(self.workdir)
return not res.status.fatal
[docs]
def in_queue(self):
"""
Check whether the job is currently managed by slurm.
We check this by loading the calculation and checking if the job status is 'RUNNING', 'COMPLETING', 'CONFIGURING' or 'PENDING'.
"""
res = results.read(self.workdir)
return res.status.name in ["RUNNING", "COMPLETING", "CONFIGURING", "PENDING"]
def __repr__(self):
return f"{type(self)}(name={self.name}, rundir={self.rundir})"
[docs]
def sbatch(self, **kwargs):
"""
Change slurm settings, for example, to change the partition or change the number of cores to use.
The arguments are the same as you would use for sbatch (`see sbatch manual <https://slurm.schedmd.com/sbatch.html>`_). E.g. to change the partition to 'tc' call:
``job.sbatch(p='tc')`` or ``job.sbatch(partition='tc')``.
Flags can be set as arguments with a boolean to enable or disable them:
``job.sbatch(exclusive=True)`` will set the ``--exclusive`` flag.
.. warning::
Note that some sbatch options, such as ``--job-name`` contain a dash, which cannot be used in Python arguments.
To use these options you should use an underscore, like ``job.sbatch(job_name='water_dimer_GO')``.
.. note::
When running the job using sbatch we add a few extra default options:
* ``-D/--chdir {self.workdir}`` to make sure the job starts in the correct directory
* ``-J/--job-name {self.rundir}/{self.name}`` to give a nicer job-name when calling squeue
* ``-o/--output {self.name}.out`` to redirect the output from the default slurm-{id}.out
You can still overwrite them if you wish.
"""
for key, value in kwargs.items():
if key == "dependency" and "dependency" in self._sbatch:
value = self._sbatch["dependency"] + "," + value
self._sbatch[key] = value
def _setup_job(self):
"""
Set up the current job. This method should create the working directory, runscript and input file.
Method must return True if it was successful.
"""
raise NotImplementedError("You must implement the _setup_job method in your subclass.")
[docs]
def run(self):
"""
Run this job. We detect if we are using slurm. If we are we submit this job using sbatch. Otherwise, we will run the job locally.
"""
if self.overwrite:
shutil.rmtree(self.workdir)
os.makedirs(self.workdir, exist_ok=True)
if self.can_skip():
log.info(f"Skipping calculation {j(self.rundir, self.name)}, it is already finished or currently pending or running.")
return
# write the post-script calls to the post-ambles:
if self.delete_on_finish:
self.add_postamble(f"rm -r {self.workdir}")
if self.delete_on_fail:
self.add_postamble("# this will delete the calculation if it failed")
self.add_postamble(f"if [[ `tc read -s {self.workdir}` = FAILED || `tc read -s {self.workdir}` = UNKNOWN ]]; then rm -r {self.workdir}; fi;")
for postscript in self._postscripts:
self._postambles.append(f'{_python_path()} {postscript[0]} {" ".join(postscript[1])}')
# setup the job and check if it was successfull
setup_success = self._setup_job()
if self.test_mode or not setup_success:
return
if slurm.has_slurm() and self.use_slurm:
# set some default sbatch settings
if any(option not in self._sbatch for option in ["D", "chdir"]):
self._sbatch.setdefault("D", self.workdir)
if any(option not in self._sbatch for option in ["J", "job_name"]):
self._sbatch.setdefault("J", f"{self.rundir}/{self.name}")
if any(option not in self._sbatch for option in ["o", "output"]):
self._sbatch.setdefault("o", f"{self.name}.out")
self._sbatch.prune()
# submit the job with sbatch
sbatch_result = slurm.sbatch(os.path.split(self.runfile_path)[1], **self._sbatch)
# store the slurm job ID
self.slurm_job_id = sbatch_result.id
# and write the command to a file so we can rerun it later
with open(j(self.workdir, "submit.sh"), "w+") as cmd_file:
cmd_file.write(sbatch_result.command)
# make the submit command executable
os.chmod(j(self.workdir, "submit.sh"), stat.S_IRWXU)
# if we requested the job to hold we will wait for the slurm job to finish
if self.wait_for_finish:
slurm.wait_for_job(self.slurm_job_id)
else:
os_name = get_os_name()
if os_name == OSName.WINDOWS:
raise TCJobError("Generic Job", "Running jobs on Windows is not supported.")
# if we are not using slurm, we can execute the file. For this we need special permissions, so we have to set that first.
os.chmod(self.runfile_path, os.stat(self.runfile_path).st_mode | stat.S_IEXEC)
runfile_dir, runscript = os.path.split(self.runfile_path)
command = ["./" + runscript] if os.name == "posix" else ["sh", runscript]
print(f"Running command: {command} in directory: {runfile_dir}")
with open(f"{os.path.split(self.runfile_path)[0]}/{self.name}.out", "w+") as out:
sp.run(command, cwd=runfile_dir, stdout=out, shell=True)
[docs]
def add_preamble(self, line: str):
"""
Add a preamble for the runscript. This should come after the shebang, but before the calculation is ran by the program (ADF or ORCA).
This can used, for example, to load some modules. E.g. to load a specific version of AMS we can call:
job.add_preamble('module load ams/2023.101')
"""
self._preambles.append(line)
[docs]
def add_postamble(self, line: str):
"""
Add a postamble for the runscript. This should come after the calculation is ran by the program (ADF or ORCA).
This can be used, for example, to remove or copy some files. E.g. to remove all t12.* files we can call:
job.add_postamble('rm t12.*')
"""
self._postambles.append(line)
[docs]
def add_postscript(self, script, *args):
"""
Add a post-script to this calculation.
This should be either a Python module with a __file__ attribute or the path to a Python script.
The post-script will be called with Python and any given args will be added as arguments when calling the script.
Args:
script: a Python object with a __file__ attribute or the file-path to a script.
*args: positional arguments to pass to the post-script.
"""
if not isinstance(script, str):
script = script.__file__
self._postscripts.append((script, args))
[docs]
def dependency(self, otherjob: "Job"):
"""
Set a dependency between this job and otherjob.
This means that this job will run after the other job is finished running succesfully.
"""
if otherjob.can_skip() and not otherjob.in_queue():
return
if hasattr(otherjob, "slurm_job_id"):
self.sbatch(dependency=f"afterok:{otherjob.slurm_job_id}")
self.sbatch(kill_on_invalid_dep="Yes")
@property
def workdir(self):
"""
The working directory of this job. All important files are written here, for example the input file and runscript.
"""
return j(os.path.abspath(self.rundir), self.name)
@property
def runfile_path(self):
"""
The file path to the runscript of this job.
"""
return j(self.workdir, f"{self.name}.run")
@property
def inputfile_path(self):
"""
The file path to the input file of this job.
"""
return j(self.workdir, f"{self.name}.in")
@property
def output_mol_path(self):
"""
This method should return the name of the output molecule if it makes sense to give it back.
E.g. for ADF it will be output.xyz in the workdir for optimization jobs.
"""
raise NotImplementedError("You must implement the _setup_job method in your subclass.")
[docs]
def molecule(self, mol: Union[str, plams.Molecule, plams.Atom, List[plams.Atom]]):
"""
Add a molecule to this calculation in various formats.
Args:
mol: the molecule to read, can be a path (str). If the path exists already we read it. If it does not exist yet, it will be read in later. mol can also be a plams.Molecule object or a single or a list of plams.Atom objects.
"""
if isinstance(mol, plams.Molecule):
self._molecule = mol
elif isinstance(mol, str) and os.path.exists(mol):
self._molecule = molecule.load(mol)
elif isinstance(mol, str):
self._molecule_path = os.path.abspath(mol)
elif isinstance(mol, list) and isinstance(mol[0], plams.Atom):
self._molecule = plams.Molecule()
[self._molecule.add_atom(atom) for atom in mol]
elif isinstance(mol, plams.Atom):
self._molecule = plams.Molecule()
self._molecule.add_atom(mol)
[docs]
def copy(self):
"""
Make and return a copy of this object.
"""
import copy
cp = Job()
# cast this object to a list of keys and values
lsts = dictfunc.dict_to_list(self.__dict__)
# copy everthing in the lists
lsts = [[copy.copy(x) for x in lst] for lst in lsts]
# and return a new result object
cp.__dict__.update(results.Result(dictfunc.list_to_dict(lsts)))
return cp