Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add file-like pager: click.get_pager_file() #1572

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ Unreleased
- When generating a command's name from a decorated function's name, the
suffixes ``_command``, ``_cmd``, ``_group``, and ``_grp`` are removed.
:issue:`2322`
- Add ``click.get_pager_file`` for file-like access to an output
pager. :pr:`1572`


Version 8.1.8
Expand Down
2 changes: 2 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ Utilities

.. autofunction:: echo_via_pager

.. autofunction:: get_pager_file

.. autofunction:: prompt

.. autofunction:: confirm
Expand Down
12 changes: 12 additions & 0 deletions docs/utils.rst
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ If you want to use the pager for a lot of text, especially if generating everyth
click.echo_via_pager(_generate_output())


For more complex programs, which can't easily use a simple generator, you
can get access to a writable file-like object for the pager, and write to
that instead:

.. click:example::
@click.command()
def less():
with click.get_pager_file() as pager:
for idx in range(50000):
print(idx, file=pager)


Screen Clearing
---------------

Expand Down
1 change: 1 addition & 0 deletions src/click/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from .termui import confirm as confirm
from .termui import echo_via_pager as echo_via_pager
from .termui import edit as edit
from .termui import get_pager_file as get_pager_file
from .termui import getchar as getchar
from .termui import launch as launch
from .termui import pause as pause
Expand Down
4 changes: 4 additions & 0 deletions src/click/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,10 @@ def should_strip_ansi(
if color is None:
if stream is None:
stream = sys.stdin
elif hasattr(stream, "color"):
# ._termui_impl.MaybeStripAnsi handles stripping ansi itself,
# so we don't need to strip it here
return False
return not isatty(stream) and not _is_jupyter_kernel_output(stream)
return not color

Expand Down
136 changes: 80 additions & 56 deletions src/click/_termui_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import collections.abc as cabc
import contextlib
import io
import math
import os
import sys
Expand All @@ -21,7 +22,6 @@
from ._compat import CYGWIN
from ._compat import get_best_encoding
from ._compat import isatty
from ._compat import open_stream
from ._compat import strip_ansi
from ._compat import term_len
from ._compat import WIN
Expand Down Expand Up @@ -361,7 +361,20 @@ def generator(self) -> cabc.Iterator[V]:
self.render_progress()


def pager(generator: cabc.Iterable[str], color: bool | None = None) -> None:
class MaybeStripAnsi(io.TextIOWrapper):
def __init__(self, stream: t.IO[bytes], *, color: bool, **kwargs: t.Any):
super().__init__(stream, **kwargs)
self.color = color

def write(self, text: str) -> int:
if not self.color:
text = strip_ansi(text)
return super().write(text)


def _pager_contextmanager(
color: bool | None = None,
) -> t.ContextManager[t.Tuple[t.BinaryIO, str, bool]]:
"""Decide what method to use for paging through text."""
stdout = _default_text_stdout()

Expand All @@ -371,32 +384,51 @@ def pager(generator: cabc.Iterable[str], color: bool | None = None) -> None:
stdout = StringIO()

if not isatty(sys.stdin) or not isatty(stdout):
return _nullpager(stdout, generator, color)
return _nullpager(stdout, color)
pager_cmd = (os.environ.get("PAGER", None) or "").strip()
if pager_cmd:
if WIN:
return _tempfilepager(generator, pager_cmd, color)
return _pipepager(generator, pager_cmd, color)
return _tempfilepager(pager_cmd, color)
return _pipepager(pager_cmd, color)
if os.environ.get("TERM") in ("dumb", "emacs"):
return _nullpager(stdout, generator, color)
return _nullpager(stdout, color)
if WIN or sys.platform.startswith("os2"):
return _tempfilepager(generator, "more <", color)
return _tempfilepager("more <", color)
if hasattr(os, "system") and os.system("(less) 2>/dev/null") == 0:
return _pipepager(generator, "less", color)
return _pipepager("less", color)

import tempfile

fd, filename = tempfile.mkstemp()
os.close(fd)
try:
if hasattr(os, "system") and os.system(f'more "{filename}"') == 0:
return _pipepager(generator, "more", color)
return _nullpager(stdout, generator, color)
return _pipepager("more", color)
return _nullpager(stdout, color)
finally:
os.unlink(filename)


def _pipepager(generator: cabc.Iterable[str], cmd: str, color: bool | None) -> None:
@contextlib.contextmanager
def get_pager_file(color: bool | None = None) -> t.Generator[t.IO, None, None]:
"""Context manager.
Yields a writable file-like object which can be used as an output pager.
.. versionadded:: 8.2
:param color: controls if the pager supports ANSI colors or not. The
default is autodetection.
"""
with _pager_contextmanager(color=color) as (stream, encoding, color):
if not getattr(stream, "encoding", None):
# wrap in a text stream
stream = MaybeStripAnsi(stream, color=color, encoding=encoding)
yield stream
stream.flush()


@contextlib.contextmanager
def _pipepager(
cmd: str, color: bool | None
) -> t.Iterator[t.Tuple[t.BinaryIO, str, bool]]:
"""Page through text by feeding it to another program. Invoking a
pager through this might support colors.
"""
Expand All @@ -415,19 +447,17 @@ def _pipepager(generator: cabc.Iterable[str], cmd: str, color: bool | None) -> N
elif "r" in less_flags or "R" in less_flags:
color = True

if color is None:
color = False

c = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, env=env)
stdin = t.cast(t.BinaryIO, c.stdin)
encoding = get_best_encoding(stdin)
try:
for text in generator:
if not color:
text = strip_ansi(text)

stdin.write(text.encode(encoding, "replace"))
except (OSError, KeyboardInterrupt):
pass
else:
stdin.close()
with stdin:
try:
yield stdin, encoding, color
except (OSError, KeyboardInterrupt):
pass

# Less doesn't respect ^C, but catches it for its own UI purposes (aborting
# search or other commands inside less).
Expand All @@ -446,33 +476,27 @@ def _pipepager(generator: cabc.Iterable[str], cmd: str, color: bool | None) -> N
break


def _tempfilepager(generator: cabc.Iterable[str], cmd: str, color: bool | None) -> None:
@contextlib.contextmanager
def _tempfilepager(
cmd: str, color: bool | None = None
) -> t.Iterator[t.Tuple[t.BinaryIO, str, bool]]:
"""Page through text by invoking a program on a temporary file."""
import tempfile

fd, filename = tempfile.mkstemp()
# TODO: This never terminates if the passed generator never terminates.
text = "".join(generator)
if not color:
text = strip_ansi(text)
encoding = get_best_encoding(sys.stdout)
with open_stream(filename, "wb")[0] as f:
f.write(text.encode(encoding))
try:
os.system(f'{cmd} "{filename}"')
finally:
os.close(fd)
os.unlink(filename)
with tempfile.NamedTemporaryFile(mode="wb") as f:
yield f, encoding, color
f.flush()
os.system(f'{cmd} "{f.name}"')


@contextlib.contextmanager
def _nullpager(
stream: t.TextIO, generator: cabc.Iterable[str], color: bool | None
) -> None:
stream: t.TextIO, color: bool | None = None
) -> t.Iterator[t.Tuple[t.BinaryIO, str, bool]]:
"""Simply print unformatted text. This is the ultimate fallback."""
for text in generator:
if not color:
text = strip_ansi(text)
stream.write(text)
encoding = get_best_encoding(stream)
yield stream, encoding, color


class Editor:
Expand Down Expand Up @@ -613,23 +637,23 @@ def _unquote_file(url: str) -> str:
wait_str = "-w" if wait else ""
args = f'cygstart {wait_str} "{url}"'
return os.system(args)

try:
if locate:
url = os.path.dirname(_unquote_file(url)) or "."
else:
url = _unquote_file(url)
c = subprocess.Popen(["xdg-open", url])
if wait:
return c.wait()
return 0
except OSError:
if url.startswith(("http://", "https://")) and not locate and not wait:
import webbrowser

webbrowser.open(url)
else:
try:
if locate:
url = os.path.dirname(_unquote_file(url)) or "."
else:
url = _unquote_file(url)
c = subprocess.Popen(["xdg-open", url])
if wait:
return c.wait()
return 0
return 1
except OSError:
if url.startswith(("http://", "https://")) and not locate and not wait:
import webbrowser

webbrowser.open(url)
return 0
return 1


def _translate_ch_to_exc(ch: str) -> None:
Expand Down
24 changes: 20 additions & 4 deletions src/click/termui.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,23 @@ def confirm(
return rv


def get_pager_file(color=None):
"""Context manager.

Yields a writable file-like object which can be used as an output pager.

.. versionadded:: 8.2

:param color: controls if the pager supports ANSI colors or not. The
default is autodetection.
"""
from ._termui_impl import get_pager_file

color = resolve_color_default(color)

return get_pager_file(color=color)


def echo_via_pager(
text_or_generator: cabc.Iterable[str] | t.Callable[[], cabc.Iterable[str]] | str,
color: bool | None = None,
Expand All @@ -267,7 +284,6 @@ def echo_via_pager(
:param color: controls if the pager supports ANSI colors or not. The
default is autodetection.
"""
color = resolve_color_default(color)

if inspect.isgeneratorfunction(text_or_generator):
i = t.cast("t.Callable[[], cabc.Iterable[str]]", text_or_generator)()
Expand All @@ -279,9 +295,9 @@ def echo_via_pager(
# convert every element of i to a text type if necessary
text_generator = (el if isinstance(el, str) else str(el) for el in i)

from ._termui_impl import pager

return pager(itertools.chain(text_generator, "\n"), color)
with get_pager_file(color=color) as pager:
for text in itertools.chain(text_generator, "\n"):
pager.write(text)


def progressbar(
Expand Down
Loading