Source code for tcutility.slurm
import os
import platform
import subprocess as sp
import time
from tcutility import cache, log, results
[docs]
@cache.cache
def has_slurm() -> bool:
"""
Function to check if the current platform uses slurm.
Returns:
Whether slurm is available on this platform.
"""
try:
# Determine the appropriate command based on the OS
command = ["which", "sbatch"] if platform.system() != "Windows" else ["where", "sbatch"]
# we do not want this function to print anything when it does not find sbatch
with open(os.devnull, "wb") as devnull:
sp.check_output(command, stderr=devnull).decode()
# if it runs without error, we have access to slurm
return True
# if an error is raised we do not have slurm
except (sp.CalledProcessError, FileNotFoundError):
return False
[docs]
@cache.timed_cache(3)
def squeue() -> results.Result:
"""
Get information about jobs managed by slurm using squeue.
Returns:
: A :class:`Result <tcutility.results.result.Result>` object containing information about the calculation status:
- ``directory`` **(list[str])** – path to slurm directories.
- ``id`` **(list[str])** – slurm job id's.
- ``status`` **(list[str])** – slurm job status name. See squeue documentation.
- ``statuscode`` **(list[str])** – slurm job status codes. See squeue documentation
.. note::
By default this function uses a timed cache (see :func:`timed_cache <tcutility.cache.timed_cache>`) with a 3 second delay to lessen the load on HPC systems.
"""
ret = results.Result()
if not has_slurm():
return ret
# specify the columns to get here
columns = ["directory", "id", "statuscode", "status"]
options = ["%Z", "%A", "%t", "%T"] # these are the squeue format codes
# set each column as an empty list in the return object
for col in columns:
ret[col] = []
# run the squeue command with the formatting options
output = sp.check_output(["squeue", "--me", "--format", "" + " ".join(options) + ""]).decode()
output = [line for line in output.splitlines()[1:] if line.strip()]
# then add the data to the return object's lists
for line in output:
[ret[col].append(val) for col, val in zip(columns, line.split())]
return ret
[docs]
def sbatch(runfile: str, **options: dict) -> results.Result:
"""
Submit a job to slurm using sbatch.
Args:
runfile: the path to the filename to be submitted.
options: options to be used for sbatch.
Returns:
: A :class:`Result <tcutility.results.result.Result>` object containing information about the newly submitted slurm job
- ``id`` **(str)** - the ID for the submitted slurm job.
- ``command`` **(str)** - the command used to submit the job.
"""
cmd = "sbatch "
for key, val in options.items():
key = key.replace("_", "-")
if val is True:
if len(key) > 1:
cmd += f"--{key} "
else:
cmd += f"-{key} "
else:
if len(key) > 1:
cmd += f"--{key}={val} "
else:
cmd += f"-{key} {val} "
cmd = cmd + runfile
ret = results.Result()
ret.command = cmd
# run the job
sbatch_out = sp.check_output(cmd.split(), stderr=sp.STDOUT).decode()
# get the slurm job id from the output
for line in sbatch_out.splitlines():
if "Submitted batch job" in line:
# set the slurm job id for this calculation, we use this in order to set dependencies between jobs.
ret.id = line.strip().split()[-1]
break
return ret
[docs]
def workdir_info(workdir: str) -> results.Result:
"""
Function that gets squeue information given a working directory. This will return None if the directory is not being actively referenced by slurm.
Returns:
:Result object containing information about the calculation status, see :func:`squeue`.
"""
if not has_slurm():
return None
sq = squeue()
if workdir not in sq.directory:
return None
workdir_index = sq.directory.index(workdir)
ret = results.Result()
for key, vals in sq.items():
ret[key] = vals[workdir_index]
return ret
[docs]
def wait_for_job(slurmid: int, check_every: int = 3):
"""
Wait for a slurm job to finish. We check every `check_every` seconds if the slurm job id is still present in squeue.
Args:
slurmid: the ID of the slurm job we are waiting for.
check_every: the amount of seconds to wait before checking squeue again. Don't put this too high, or you will anger the cluster people.
"""
while slurmid in squeue().id:
time.sleep(check_every)
if __name__ == "__main__":
if has_slurm():
log.info("This platform has SLURM.")
else:
log.info("This platform does not have SLURM.")