"""Progress bar for visualising progress of simulation runs."""
import html
import os
import platform
import sys
from timeit import default_timer as timer
from typing import Dict, Optional, TextIO, Union
try:
from IPython import get_ipython
from IPython.display import display as ipython_display
IPYTHON_AVAILABLE = True
except ImportError:
IPYTHON_AVAILABLE = False
def _in_zmq_interactive_shell() -> bool:
"""Check if in interactive ZMQ shell which supports updateable displays"""
if not IPYTHON_AVAILABLE:
return False
else:
try:
shell = get_ipython().__class__.__name__
if shell == "ZMQInteractiveShell":
return True
elif shell == "TerminalInteractiveShell":
return False
else:
return False
except NameError:
return False
def _in_shell_with_ansi_support() -> bool:
"""Check if running in shell with support for ANSI escape characters.
Based on https://gist.github.com/ssbarnea/1316877
"""
return (
(
hasattr(sys.stdout, "isatty")
and sys.stdout.isatty()
and platform.system() != 'Windows'
)
or os.environ.get('TERM') == "ANSI"
or os.environ.get('PYCHARM_HOSTED') == "1"
)
def _create_display(obj):
"""Create an updateable display object.
:param obj: Initial object to display.
:return: Object with `update` method to update displayed content.
"""
if _in_zmq_interactive_shell():
return ipython_display(obj, display_id=True)
else:
display = (
AnsiStreamDisplay() if _in_shell_with_ansi_support()
else BasicStreamDisplay()
)
display.update(obj)
return display
def _format_time(total_seconds: Union[int, float]) -> str:
"""Format a time interval in seconds as a colon-delimited string [h:]m:s"""
total_mins, seconds = divmod(int(total_seconds), 60)
hours, mins = divmod(total_mins, 60)
if hours != 0:
return f"{hours:d}:{mins:02d}:{seconds:02d}"
else:
return f"{mins:02d}:{seconds:02d}"
def _format_stat(stat) -> str:
"""Format numeric stat at fixed precision otherwise convert to string."""
if isinstance(stat, (int, float)):
return f"{stat:.4g}"
else:
return str(stat)
[docs]
class ProgressBar:
"""Iterable object for tracking progress of an iterative task.
Implements both string and HTML representations to allow richer
display in interfaces which support HTML output, for example Jupyter
notebooks or interactive terminals.
"""
GLYPHS = " ▏▎▍▌▋▊▉█"
"""Characters used to create string representation of progress bar."""
def __init__(
self,
n_step: int,
description: Optional[str] = None,
n_col: int = 10,
unit: str = "step",
min_refresh_time: float = 1.,
):
"""
:param n_step: Total number of steps in task.
:param description: Description of task to prefix progress bar with.
:param n_col: Number of columns (characters) to use in string
representation of progress bar.
:param unit: String describing unit of each step.
:param min_referesh_time: Minimum time in seconds between each
refresh of progress bar visual representation.
"""
assert int(n_step) == n_step and n_step > 0, "n_step must be positive integer"
self._n_step = int(n_step)
self._description = description
self._active = False
assert int(n_col) == n_col and n_col > 0, "n_col must be positive integer"
self._n_col = int(n_col)
self._unit = unit
self._capitalized_unit = unit.capitalize()
self._step = 0
self._start_time = None
self._elapsed_time = 0
self._stats_dict = {}
assert min_refresh_time >= 0, "min_refresh_time must be non-negative"
self._min_refresh_time = min_refresh_time
self._display = None
@property
def n_step(self):
"""Total number of steps in task."""
return self._n_step
@property
def description(self):
""""Description of task being tracked."""
return self._description
@property
def step(self):
"""Progress step count."""
return self._step
@step.setter
def step(self, value):
self._step = max(0, min(value, self.n_step))
@property
def prop_complete(self):
"""Proportion complete (float value in [0, 1])."""
return self.step / self.n_step
@property
def perc_complete(self):
"""Percentage complete formatted as string."""
return f"{int(self.prop_complete * 100):3d}%"
@property
def elapsed_time(self):
"""Elapsed time formatted as string."""
return _format_time(self._elapsed_time)
@property
def iter_rate(self):
"""Mean iteration rate if ≥ 1 `unit/s` or reciprocal `s/unit` as string."""
if self.prop_complete == 0:
return "?"
else:
mean_time = self._elapsed_time / self.step
return (
f"{mean_time:.2f}s/{self._unit}"
if mean_time > 1
else f"{1/mean_time:.2f}{self._unit}/s"
)
@property
def est_remaining_time(self):
"""Estimated remaining time to completion formatted as string."""
if self.prop_complete == 0:
return "?"
else:
return _format_time((1 / self.prop_complete - 1) * self._elapsed_time)
@property
def n_block_filled(self):
"""Number of filled blocks in progress bar."""
return int(self._n_col * self.prop_complete)
@property
def n_block_empty(self):
"""Number of empty blocks in progress bar."""
return self._n_col - self.n_block_filled
@property
def prop_partial_block(self):
"""Proportion filled in partial block in progress bar."""
return self._n_col * self.prop_complete - self.n_block_filled
@property
def filled_blocks(self):
"""Filled blocks string."""
return self.GLYPHS[-1] * self.n_block_filled
@property
def empty_blocks(self):
"""Empty blocks string."""
if self.prop_partial_block == 0:
return self.GLYPHS[0] * self.n_block_empty
else:
return self.GLYPHS[0] * (self.n_block_empty - 1)
@property
def partial_block(self):
"""Partial block character."""
if self.prop_partial_block == 0:
return ""
else:
return self.GLYPHS[int(len(self.GLYPHS) * self.prop_partial_block)]
@property
def progress_bar(self):
"""Progress bar string."""
return f"|{self.filled_blocks}{self.partial_block}{self.empty_blocks}|"
@property
def bar_color(self):
"""CSS color property for HTML progress bar."""
if self.step == self.n_step:
return "var(--jp-success-color1, #4caf50)"
elif self._active:
return "var(--jp-brand-color1, #2196f3)"
else:
return "var(--jp-error-color1, #f44336)"
@property
def stats(self):
"""Comma-delimited string list of statistic key=value pairs."""
return ", ".join(
f"{k}={_format_stat(v)}" for k, v in self._stats_dict.items()
)
@property
def prefix(self):
"""Text to prefix progress bar with."""
return (
f'{self.description + ": "if self.description else ""}'
f"{self.perc_complete}"
)
@property
def postfix(self):
"""Text to postfix progress bar with."""
return (
f"{self._capitalized_unit} {self.step}/{self.n_step} "
f"[{self.elapsed_time}<{self.est_remaining_time}, "
f"{self.iter_rate}"
f'{", " + self.stats if self._stats_dict else ""}]'
)
[docs]
def reset(self):
"""Reset progress bar state."""
self._step = 0
self._start_time = timer()
self._last_refresh_time = -float("inf")
self._stats_dict = {}
[docs]
def update(
self,
step: int,
stats_dict: Optional[Dict] = None,
refresh: bool = True
):
"""Update progress bar state.
:param step: New value for step counter.
:param stats_dict: Dictionary of statistic key-value pairs to use to
update postfix stats.
:param refresh: Whether to refresh display.
"""
if step == 0:
self.reset()
else:
self.step = step
if stats_dict is not None:
self._stats_dict.update(stats_dict)
self._elapsed_time = timer() - self._start_time
if (
refresh
and step == self.n_step
or (timer() - self._last_refresh_time > self._min_refresh_time)
):
self.refresh()
self._last_refresh_time = timer()
[docs]
def refresh(self):
"""Refresh visual display(s) of progress bar."""
self._display.update(self)
[docs]
def start(self):
"""Start tracking progress of task."""
self._active = True
self.reset()
if self._display is None:
self._display = _create_display(self)
[docs]
def stop(self):
"""Stop tracking progress of task."""
self._active = False
if self.step != self.n_step:
self.refresh()
if isinstance(self._display, StreamDisplay):
self._display.close()
def __str__(self):
return f"{self.prefix}{self.progress_bar}{self.postfix}"
def __repr__(self):
return self.__str__()
def _repr_html_(self):
return f"""
<div style="line-height: 28px; width: 100%; display: flex;
flex-flow: row wrap; align-items: center;
position: relative; margin: 2px;">
<label style="margin-right: 8px; flex-shrink: 0;
font-size: var(--jp-code-font-size, 13px);
font-family: var(--jp-code-font-family, monospace);">
{html.escape(self.prefix).replace(' ', ' ')}
</label>
<div role="progressbar" aria-valuenow="{self.prop_complete}"
aria-valuemin="0" aria-valuemax="1"
style="position: relative; flex-grow: 1; align-self: stretch;
margin-top: 4px; margin-bottom: 4px; height: initial;
background-color: #eee;">
<div style="background-color: {self.bar_color}; position: absolute;
bottom: 0; left: 0; width: {self.perc_complete};
height: 100%;"></div>
</div>
<div style="margin-left: 8px; flex-shrink: 0;
font-family: var(--jp-code-font-family, monospace);
font-size: var(--jp-code-font-size, 13px);">
{html.escape(self.postfix)}
</div>
</div>
"""
[docs]
class StreamDisplay:
"""Base class for using I/O streams as an updatable display."""
def __init__(self, io: Optional[TextIO] = None):
"""
:param io: I/O stream to write updates to. Defaults to `sys.stdout` if `None`.
"""
self._io = io if io is not None else sys.stdout
[docs]
def close(self):
self._io.write("\n")
self._io.flush()
[docs]
def update(self, obj):
"""Update display with string representation of `obj`."""
raise NotImplementedError()
[docs]
class AnsiStreamDisplay(StreamDisplay):
"""Use I/O stream which supports ANSI escape sequences as an updatable display."""
[docs]
def update(self, obj):
self._io.write("\x1b[2K\r")
self._io.write(str(obj))
self._io.flush()
[docs]
class BasicStreamDisplay(StreamDisplay):
"""Use I/O stream without ANSI escape sequence support as an updatable display."""
def __init__(self, io: Optional[TextIO] = None):
super().__init__(io)
self._last_string_length = 0
[docs]
def update(self, obj):
string = str(obj)
self._io.write(f"\r{string: <{self._last_string_length}}")
self._last_string_length = len(string)
self._io.flush()