Source code for tcmu.cli_scripts.workflow

from pprint import pprint
from typing import List

import click

from tcmu import workflow_db, log
from tcmu.job.workflow import WorkFlowOutput
import time
import datetime

LINE_UP = '\033[1A'
LINE_CLEAR = '\x1b[2K'


[docs] def clear_lines(n): for _ in range(n): print(LINE_UP, end=LINE_CLEAR)
@click.group("workflow") def workflow(): ''' The ``tcmu workflow`` subcommand gives a few tools for reading the statuses of, and clearing of past workflow runs. ''' pass @click.command("status") @click.option("-h", "--hash", "use_hash", is_flag=True) @click.option("-x", "--exit", is_flag=True) @click.argument("name", required=False) def status(use_hash: bool = False, name: str = None, exit: bool = False): ''' Read the statuses of completed and currently running workflow runs. Use the ``name`` argument to specify a specific workflow type or a specific hash when the ``-h/--hash`` flag is turned on. If ``name`` is not given, prints the statuses of all workflow runs and provides an overview of the number of runs per workflow. If the ``-x/--exit`` flag is set print the status once and then exit immediately. Otherwise, it will update every second. ''' def time_sort(time_str): if time_str == '': return 0 days = 0 hours = 0 if '-' in time_str: days, time_str = time_str.split('-') if time_str.count(':') == 2: hours, time_str = time_str.split(':', 1) minutes, seconds = time_str.split(':') return int(days) * 24 * 60 * 60 + int(hours) * 60 * 60 + int(minutes) * 60 + int(seconds) def sort_rows(rows): # sort by name first if name is None: rows = list(sorted(rows, key=lambda row: row[3])) else: rows = list(sorted(rows, key=lambda row: row[2])) # then sort by status # finished go on top # pending go in the middle # running goes on bottom misc_rows = [] pend_rows = [] run_rows = [] for row in rows: if row[0] not in ['PENDING', 'RUNNING']: misc_rows.append(row) if row[0] == 'PENDING': pend_rows.append(row) if row[0] == 'RUNNING': run_rows.append(row) run_rows = list(sorted(run_rows, key=lambda row: time_sort(row[-2]))) new_rows = misc_rows + pend_rows + run_rows return new_rows def get_str(): s = '' if use_hash: s += workflow_db.get_status(name) + '\n' else: log.print_date = False rows = [] status_counts = {} workflow_name_counts = {} for hsh, data in workflow_db.read_all().items(): if name is None or data.get('workflow_name', None) == name: status = data.get('status', 'UNKOWN') status_counts.setdefault(status, 0) status_counts[status] += 1 workflow_name = data.get('workflow_name', '') workflow_name_counts.setdefault(workflow_name, 0) workflow_name_counts[workflow_name] += 1 start_time = data.get('start_time', None) if start_time is not None and status == 'RUNNING': t = datetime.datetime.strptime(start_time, '%Y-%m-%d-%H-%M-%S') td = datetime.datetime.now() - t run_time = '' if td >= datetime.timedelta(days=1): run_time += f'{td.days}-' if td >= datetime.timedelta(hours=1): run_time += f'{td.seconds//(60*60):02}:' run_time += f'{td.seconds//60%60:02}:{td.seconds%60:02}' else: run_time = '' if name is None: rows.append((status, workflow_name, data.get('slurm_job_id', ''), hsh, run_time, data.get('stage', ''))) else: rows.append((status, data.get('slurm_job_id', ''), hsh, run_time, data.get('stage', ''))) if len(rows) == 0: if name is None: return f'I could not find any workflow runs.\n' else: return f'I could not find any runs for WorkFlow({name}).\n' if name is None: s += f'Found {sum(list(status_counts.values()))} total run(s).' + '\n\n' s += f' From the following workflows:' + '\n' for workflow_name, nums in workflow_name_counts.items(): s += f' {nums:>5} {workflow_name}\n' else: s += f'Found {sum(list(status_counts.values()))} total run(s) for WorkFlow({name}):\n\n' s += f' With the following statuses:\n' for status, nums in status_counts.items(): s += f' {nums:>5} {status}\n' s += '\n' rows = sort_rows(rows) if name is None: s += log.table(rows, header=('Status', 'Workflow', 'SlurmJobID', 'Hash', 'RunTime', 'Stage'), as_str=True) else: s += log.table(rows, header=('Status', 'SlurmJobID', 'Hash', 'RunTime', 'Stage'), as_str=True) return s + '\n[Ctrl+C] to exit\n' s = get_str() print(s) if exit: return previous_n_lines = len(s.splitlines()) + 1 while True: s = get_str() clear_lines(previous_n_lines) print(s) time.sleep(1) n_lines = len(s.splitlines()) previous_n_lines = len(s.splitlines()) + 1 workflow.add_command(status) @click.command("clear") @click.option("-h", "--hash", "use_hash", is_flag=True) @click.argument("name", required=True) def clear(use_hash: bool = False, name: str = None): ''' Clears data related to workflow runs. Use the ``name`` argument to specify a specific workflow type or a specific hash when the ``-h/--hash`` flag is turned on. ''' if use_hash: workflow_db.delete(name) return hashes = [] for hsh, data in workflow_db.read_all().items(): if data.get('workflow_name', None) == name: hashes.append(hsh) if len(hashes) == 0: print(f'I could not find any runs for WorkFlow({name}).') return proceed = input(f'This will delete {len(hashes)} workflow runs; proceed? (y/[n]): ') if proceed == 'y': for hsh in hashes: workflow_db.delete(hsh) print() else: print('Cancelling ...') workflow.add_command(clear) @click.command("where") def where(): ''' Prints where workflow data is stored. ''' print(f'CACHEDIR: {workflow_db.CACHEDIR}') print(f'DBPATH: {workflow_db.DBPATH}') workflow.add_command(where)