Source code for tcmu.job.workflow_db
import os
import platformdirs
from typing import List, Tuple, Dict
import tcmu
from filelock import FileLock
CACHEDIR = platformdirs.user_cache_dir(appname="tcmu", appauthor="TheoCheMVU", ensure_exists=True)
DBPATH = CACHEDIR + '/workflows.csv'
DBPATH_LOCK = FileLock(CACHEDIR + '/workflows.csv.lock')
# DBPATH = 'test.csv'
# create a new db file if it doesnt exist yet
if not os.path.exists(DBPATH):
with open(DBPATH, 'w+'):
...
#### BASIC FUNCTIONS
[docs]
def write(hsh: str, **kwargs):
s = f'{hsh}'
for k, v in kwargs.items():
s += f', {k}={v}'
s += '\n'
with DBPATH_LOCK:
with open(DBPATH, 'a') as db:
db.write(s)
[docs]
def read(hsh: str) -> dict:
'''
Get the status of a workflow with specific args and kwargs.
'''
# default status is unknown
for _hsh, data in read_all().items():
if hsh == _hsh:
return data
return {}
[docs]
def parse_line(line: str) -> Tuple[str, dict]:
'''
Read information from a line from the database.
'''
# the hsh is always the first entry
hsh = line.split(',')[0]
data = {}
# read anything after the hash
for part in line.split(',')[1:]:
parts = part.split('=')
if len(parts) == 2:
k, v = parts
data[k.strip()] = v.strip()
else:
k = parts[0]
v = None
data[k.strip()] = None
# return the hash and data separately
return hsh, data
[docs]
def read_all() -> Dict[str, dict]:
'''
Return all lines that are in the database.
'''
with DBPATH_LOCK:
with open(DBPATH) as db:
lines = db.readlines()
# parse the lines we found
parsed_lines = [parse_line(line) for line in lines]
# and construct a dictionary
return {hsh: data for hsh, data in parsed_lines}
[docs]
def read_remote(server: tcmu.connect.Connection) -> Dict[str, dict]:
'''
Return all lines that are in the database.
'''
file = server.download('.cache/tcmu/workflows.csv', 'workflows.csv')
with open('workflows.csv') as db:
lines = db.readlines()
# parse the lines we found
parsed_lines = [parse_line(line) for line in lines]
# and construct a dictionary
return {hsh: data for hsh, data in parsed_lines}
[docs]
def update(hsh: str, **kwargs) -> None:
'''
Update a record in the database associated with the given hash.
'''
data = read(hsh)
data.update(kwargs)
delete(hsh)
write(hsh, **data)
[docs]
def delete(hsh: str) -> None:
'''
Delete records related to the given hash.
'''
# make a list of lines that we will rewrite
# all_data = read_all()
# all_data.pop(hsh, None)
# for _hsh, data in all_data.items():
# write(_hsh, data)
with DBPATH_LOCK:
with open(DBPATH) as db:
lines = db.readlines()
new_lines = [line for line in lines if line.split(',')[0] != hsh]
with DBPATH_LOCK:
with open(DBPATH, 'w+') as db:
for line in new_lines:
db.write(line)
# #### CONVENIENCE FUNCTIONS
[docs]
def get_status(hsh: str) -> str:
'''
Get the status of a workflow with specific args and kwargs.
'''
return read(hsh).get('status', None)
[docs]
def get_workflow_name(hsh: str) -> str:
'''
Get the status of a workflow with specific args and kwargs.
'''
return read(hsh).get('workflow_name', None)
def can_skip(hsh: str, server: tcmu.connect.Server = tcmu.connect.Local()) -> bool:
'''
Checks if a workflow with specific args and kwargs has finished.
'''
status = get_status(hsh)
# if the status indicates the workflow already ran we can skip
if status in ['SUCCESS', 'FAILED']:
return True
# if the workflow is still running we need to check if it
# is being managed by slurm
elif status == 'RUNNING':
data = read(hsh)
# if the workflow is managed by slurm it should have a slurm-job-id
slurm_job_id = data.get('slurm_job_id', None)
# if it does not we can assume it failed
if slurm_job_id is None:
return False
# if it does, we need to check if it is in the queue
sq = tcmu.slurm.squeue(server=server)
# if it is being managed by slurm we can skip it
# otherwise something went wrong and the status was not updated
return slurm_job_id in sq['id']
return False
[docs]
def set_status(hsh: str, new_status: str) -> None:
'''
Checks if a workflow with specific args and kwargs has finished.
'''
update(hsh, status=new_status)
[docs]
def set_running(hsh: str) -> None:
'''
Checks if a workflow with specific args and kwargs has finished.
'''
update(hsh, status='RUNNING')
[docs]
def set_finished(hsh: str) -> None:
'''
Checks if a workflow with specific args and kwargs has finished.
'''
update(hsh, status='SUCCESS')
[docs]
def set_failed(hsh: str) -> None:
'''
Checks if a workflow with specific args and kwargs has finished.
'''
update(hsh, status='FAILED')