import inspect
import json
import os
import sys
from datetime import datetime
from math import ceil
from time import perf_counter
from typing import Any, Generator, Iterable, List, Sequence, TypeVar, Union
import numpy as np
from tcutility import ensure_2d
# from threading import Thread
###########################################################
# MODULE LEVEL VARIABLES USED TO CHANGE LOGGING BEHAVIOUR #
logfile = sys.stdout # stream or file to send prints to. Default sys.stdout is the standard Python print output
errfile = sys.stderr # stream or file to send prints to. Default sys.stderr is the standard Python print output
tab_level = 0 # number of tabs to add before a logged message. This is useful to build a sense of structure in a long output
max_width = 0 # maximum width of printed messages. Default 0 means unbounded
print_date = True # print time stamp before a logged message
log_level = 20 # the level of verbosity. Messages with a log_level >= this number will be sent to logfile
[docs]
class Emojis:
"""
Class containing some useful emojis and other characters.
Supports dot-notation and indexation to get a character.
E.g. ``Emojis.wait == Emojis['wait']``
"""
wait = "đ"
good = "â
"
cancel = "đ"
sleep = "đ¤"
fail = "â"
send = "đ¤"
receive = "đĨ"
empty = "â â "
finish = "đ"
warning = "â ī¸"
question = "â"
info = "âšī¸"
rarrow = "â>"
larrow = "<â"
lrarrow = rlarrow = "<â>"
angstrom = "Ã
"
def __init__(self):
raise SyntaxError("Do not instantiate the Emojis-class.")
def __class_getitem__(cls, item):
return getattr(cls, item)
[docs]
class NoPrint:
"""
Context-manager that suppresses printing. It works by redirecting prints to a temporary output file.
This file is deleted after exiting the context-manager.
"""
def __init__(self, stdout=None, stderr=None):
self.stdout = stdout or logfile
self.stderr = stderr or errfile
self.overflow = open(str(os.getpid()) + "_NOPRINT.tmp", "w+")
def __enter__(self):
global logfile, errfile
logfile = self.overflow
errfile = self.overflow
def __exit__(self, *args, **kwargs):
global logfile, errfile
logfile = self.stdout
errfile = self.stderr
self.overflow.close()
os.remove(str(os.getpid()) + "_NOPRINT.tmp")
[docs]
def time_stamp():
"""
Return the current timestamp in a "[YYYY/MM/DD HH:MM:SS] "" format.
"""
now = datetime.now()
return f"[{now.year}/{str(now.month).zfill(2)}/{str(now.day).zfill(2)} {str(now.hour).zfill(2)}:{str(now.minute).zfill(2)}:{str(now.second).zfill(2)}] "
[docs]
def log(message: Any = "", level: int = 20, end: str = "\n"):
r"""
Print a nicely formatteed message.
This function adds the current timestamp and supports multi-line printing (split on the ``\n`` escape character).
For verbosity levels we use the following convention:
.. code-block::
NOTSET = 0
DEBUG = 10
INFO = 20
WARN = 30
ERROR = 40
CRITICAL = 50
Args:
message: the message to send. Before printing we will use the ``message.__str__`` method to get the string representation. If the message is a ``dict`` we use the ``json`` module to format the message nicely.
level: the level to print the message at. We compare the level against the module-wide ``log_level`` variable (by default ``log_level = 20``). If the level is below ``log_level`` we do not print it.
end: the end of the string. This is usually the new-line character ``\n``.
"""
if level < log_level:
return
# dictionaries are handled specially so that they look nice
if isinstance(message, dict):
message = json.dumps(message, indent=4, sort_keys=True)
# Print each line separately
splitted_message = str(message).split("\n")
for m in splitted_message:
# handle lines that exceed the maximum print width
if max_width > 0 and len(m) > max_width:
m = m[: max_width - 4] + " ..."
# add the tab-levels and timestamp
m = "\t" * tab_level + m
if print_date:
m = time_stamp() + m
print(m, file=logfile, end=end, flush=True)
[docs]
def flow(message: str = "", tags: List[str] = ["straight"], level: int = 20) -> None:
"""
Function to create flowchart-like output.
It will print a message prepended by flow elements (arrows and lines).
The flow elements are determined based on the given tags.
"""
elements = {
"start": "⯠",
"startinv": "⡠",
"end": "â°â> ",
"straight": "â ",
"split": "ââ> ",
"skip": " ",
"vert": "ââââ",
"endvert": "â°ââ> ",
"splitvert": "âââ> ",
}
s = ""
for tag in tags:
s += elements[tag]
log(s + message, level=level)
[docs]
def table(rows: List[List[Any]], header: Union[List[str], None] = None, sep: str = " ", hline: List[int] = [], level: int = 20) -> str:
r"""
Print a table given rows and a header. Values in rows will be cast to strings first.
Args:
rows: list of `nrows` sequences containing `ncols` data inside the table.
header: list of `ncols` strings that represent the column names of the table. They will be printed at the top of the table.
sep: str representing the separation between columns.
hline: list of integers specifying rows after which lines will be drawn. Supports negative indices, e.g. -1 will draw a line at the end of the table.
Returns:
str: the table in string format, where lines are separated by "\n"
"""
rows = ensure_2d(rows)
nrows = len(rows)
ncols = len(rows[0])
hline = [h + nrows + 1 if h < 0 else h for h in hline]
if header:
rows = [header] + rows
hline = [0] + hline
rows = [[str(x) for x in row] for row in rows]
column_sizes = [max(len(row[i]) for row in rows) for i in range(ncols)]
return_str = ""
for i, row in enumerate(rows):
row_str = sep.join([x.ljust(column_size) for x, column_size in zip(row, column_sizes)]) + "\n"
return_str += row_str
if i in hline:
return_str += "â" * len(row_str) + "\n"
log(return_str, level=level)
return return_str
[docs]
def rectangle_list(values: Sequence, spaces_before: int = 0, level: int = 20):
"""
This function prints a list of strings in a rectangle to the output.
This is similar to what the ls program does in unix.
"""
n_shell_col = os.get_terminal_size().columns
# we first have to determine the correct dimensions of our rectangle
for ncol in range(1, n_shell_col):
# the number of rows for the number of columns
nrows = ceil(len(values) / ncol)
# we then get what the rectangle would be
mat = [[str(x) for x in values[i * ncol : (i + 1) * ncol]] for i in range(nrows)]
# and determine for each column the width
col_lens = [max([len(row[i]) for row in mat if i < len(row)] + [0]) for i in range(ncol)]
# then calculate the length of each row based on the column lengths
# we use a spacing of 2 spaces between each column
row_len = 22 * print_date + spaces_before + sum(col_lens) + 2 * len(col_lens) - 2
# if the rows are too big we exit the loop
if row_len > n_shell_col:
break
# store the previous loops results
prev_col_lens = col_lens
prev_mat = mat
# then print the strings with the right column widths
for row in prev_mat:
log(" " * spaces_before + " ".join([x.ljust(col_len) for x, col_len in zip(row, prev_col_lens)]), level=level)
T = TypeVar("T")
[docs]
def loadbar(sequence: Union[Iterable[T], Sequence[T]], comment: str = "", Nsegments: int = 50, Nsteps: int = 10, level: int = 20) -> Generator[T, None, None]:
"""
Return values from an iterable ``sequence`` and also print a progress bar for the iteration over this sequence.
Args:
sequence: any iterable sequence. Should define the ``__len__`` method.
comment: a string to be printed at the end of the loading bar to give information about the loading bar.
Nsegments: length of the loading bar in characters.
Nsteps: number of times to print the loading bar during iteration. If the output is a tty-type stream Nsteps will be set to the length of sequence.
"""
if not isinstance(sequence, Sequence):
chars = ["â ", "â ", "â ", "â Ļ", "â §", "â ˇ", "â ŋ", "â ģ", "â ", "â ", "â ", "â ", "â "]
iteration = 0
if not logfile.isatty() and comment:
log(comment, level=level)
starttime = perf_counter()
for val in sequence:
iteration += 1
elapsed_time = perf_counter() - starttime
time_per_step = elapsed_time / iteration
# every 0.1 seconds we change the character
char_step = int(elapsed_time / 0.1) % len(chars)
if logfile.isatty():
log(f"{chars[char_step]} {comment} [Steps: {iteration}, Elapsed: {elapsed_time:.1f}s]", end="\r", level=level)
yield val
log(level=level)
return
N = len(sequence)
# if the output stream is tty-type we set the number of steps to the lenth of the sequence so the loading bar looks smoother
Nsteps = N if logfile.isatty() else Nsteps
Ndigits = int(np.log10(N)) + 1 # well-known method to get number of digits of an integer
# we track what the maximum length of the loading bar is.
# We use the '\r' return carriage when logging, so we have to overwrite the whole previous line.
# For this we must know the largest length of our loading bar strings.
max_length = 0
# track when the loading bar started. We use this to calculate the ETA later
loading_bar_start_time = perf_counter()
for i, val in enumerate(sequence):
# we only print the loading bar on every Nsteps iterations
if i % (N // min(N, Nsteps)) != 0:
yield val
continue
# calculate how many segments should be filled and empty
Nfilled_segments = int(i / N * Nsegments)
filled = "â" * Nfilled_segments
empty = " " * (Nsegments - Nfilled_segments)
# determine what the cursor should look like
if Nfilled_segments == Nsegments: # if the bar is fully filled it should be just a stripe
cursor = "â"
elif i == 0: # if the bar is empty we don't draw the cursor
cursor = " "
else: # normal case
cursor = ">"
# calculate the ETA. In the first iteration we cannot know yet
if i == 0:
eta = "???"
else:
# the algorithm simply extrapolates the current time taken to the case of the final iteration
time_taken = perf_counter() - loading_bar_start_time
time_per_step = time_taken / i
time_left = time_per_step * (N - i)
eta = f"{time_left:.1f}"
# build the loading bar string
s = f'{i:{Ndigits}}/{N} {"â" if i > 0 else "|"}{filled + cursor + empty}{"â"} {i/N:4.0%} {comment} [ETA: {eta}s]'.ljust(max_length)
# update the max loading bar string length
max_length = max(len(s), max_length)
log(s, end="\r", level=level)
yield val
# plot the final iteration at 100% filled
s = f'{i+1:{Ndigits}}/{N} {"â"}{"â"*(Nsegments+1)}⤠100% {comment}'.ljust(max_length)
log(s, end="\r", level=level)
log(level=level)
[docs]
def boxed(message: str, title: Union[str, None] = None, message_align: str = "left", title_align: str = "left", round_corners: bool = True, double_edge: bool = False, level: int = 20) -> None:
r"""
Print a message surrounded by a box with optional title.
Args:
message: the message to place in the box. Multiline messages are separated by "\n".
title: the title placed in the top edge of the box.
message_align: alignment of the text inside the box. One of ["left", "center", "right"].
title_align: alignment of the title. One of ["left", "center", "right"].
round_corners: whether the corners of the box should be rounded or not. Rounded corners are only available for single-edge boxes.
double_edge: whether the edges of the box should be double.
Returns:
The printed message in strings format.
"""
# define the edges and corners
straights = ["â", "â"]
if round_corners and not double_edge:
corners = ["â", "âŽ", "â¯", "â°"]
elif double_edge:
corners = ["â", "â", "â", "â"]
straights = ["â", "â"]
else:
corners = ["â", "â", "â", "â"]
# get the width the box should have
messages = message.split("\n")
# the length is influenced by both the messages as well as the title
maxlen = max(max(len(message) for message in messages), len(title or ""))
# build first row containing the title
if title is not None:
if title_align == "left":
s = corners[0] + (" " + title + " ").ljust(maxlen + 2, straights[1]) + corners[1] + "\n"
elif title_align == "right":
s = corners[0] + (" " + title + " ").rjust(maxlen + 2, straights[1]) + corners[1] + "\n"
else:
s = corners[0] + (" " + title + " ").center(maxlen + 2, straights[1]) + corners[1] + "\n"
else:
s = corners[0] + straights[1] * (maxlen + 2) + corners[1] + "\n"
# build main body of box
for message in messages:
if message_align == "left":
s += f"{straights[0]} " + message.ljust(maxlen) + f" {straights[0]}\n"
if message_align == "right":
s += f"{straights[0]} " + message.rjust(maxlen) + f" {straights[0]}\n"
if message_align == "center":
s += f"{straights[0]} " + message.center(maxlen) + f" {straights[0]}\n"
# build bottom row
s += corners[3] + straights[1] * (maxlen + 2) + corners[2]
log(s, level=level)
[docs]
def debug(message: str, level: int = 10, caller_level: int = 2):
"""
Print a debug message.
"""
log(f"[DEBUG]({caller_name(caller_level)}): " + message, level=level)
[docs]
def info(message: str, level: int = 20, caller_level: int = 2):
"""
Print an informative message.
"""
log(f"[INFO]({caller_name(caller_level)}): " + message, level=level)
[docs]
def warn(message: str, level: int = 30, caller_level: int = 2):
"""
Print a warning message.
"""
log(f"[WARNING]({caller_name(caller_level)}): " + message, level=level)
[docs]
def error(message: str, level: int = 40, caller_level: int = 2):
"""
Print an error message.
"""
log(f"[ERROR]({caller_name(caller_level)}): " + message, level=level)
[docs]
def critical(message: str, level: int = 50, caller_level: int = 2):
"""
Print a critical message.
"""
log(f"[CRITICAL]({caller_name(caller_level)}): " + message, level=level)
[docs]
def caller_name(level: int = 1) -> str:
"""
Return the full name of the caller of a function.
Args:
level: the number of levels to skip when getting the caller name. Level 1 is always this function. When used by a different function it should be set to 2. E.g. when using the log.warn function level is set to 2.
Returns:
The full name of the caller function.
"""
stack = inspect.stack()
names = [stack[level][3]] # add the original function, this should be at a certain levels, since this function is called by warn or info or error
for frame in stack[level:]:
if frame.function == "<module>": # <module> is always there, so we can skip it
continue
# get the more interesting names
# first check if the
if hasattr(frame[0], "f_locals") and "self" in frame[0].f_locals:
clas = frame[0].f_locals["self"].__class__
module = clas.__module__
if module == "builtins":
names.append(clas.__qualname__)
else:
names.append(module + "." + clas.__qualname__)
elif hasattr(frame[0], "f_code"):
names.append(frame[0].f_code.co_name)
names_ordered = []
for name in names[::-1]:
if name not in names_ordered:
names_ordered.append(name)
return ".".join(names_ordered)
if __name__ == "__main__":
boxed("testing, 1, 2, 3, 4, 5, 6, 7, 8\nsecond row, 1, 2, 3, 4, 5, 6\n...\n...\n\n...\n...\nLast row here", title="ReactionRunner")
with NoPrint():
log("test test test")
log("test test test")
log("test test test")
log("test test test")
log()
flow("Start", ["start"])
flow()
flow("First step", ["split"])
flow("First substep of first step", ["straight", "split"])
flow("Second substep of first step", ["straight", "split"])
flow("Third and final substep of first step", ["straight", "end"])
flow()
flow("Second step", ["split"])
flow("Substep of second step", ["straight", "split"])
flow("Substep of substep of second step", ["straight", "straight", "end"])
flow("Substep of substep of substep of second step", ["straight", "straight", "skip", "end"])
flow("Substep of second step", ["straight", "end"])
flow()
flow("Final step", ["split"])
flow()
flow(f"{Emojis.good} The end", ["startinv"])
log()
info("This is important info")
warn("This is an important warning!")
import time
for x in loadbar(range(100), "Sleeping test"):
time.sleep(0.01)
x = np.linspace(1, 12, 12)
rows = np.vstack([x, x**2, x**3, x**4, x**5]).astype(int).T.tolist()
table(rows, header=["X", "y=x^2", "y=x^3", "y=x^4", "y=x^5"], hline=[-1])
from tcutility import log as cli_log
class TestClass:
def test_method(self):
cli_log.warn("I am testing the warning function")
TestClass().test_method()
rectangle_list(range(1000))