spinners.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. import contextlib
  2. import itertools
  3. import logging
  4. import sys
  5. import time
  6. from typing import IO, Iterator
  7. from pip._vendor.progress import HIDE_CURSOR, SHOW_CURSOR
  8. from pip._internal.utils.compat import WINDOWS
  9. from pip._internal.utils.logging import get_indentation
  10. logger = logging.getLogger(__name__)
  11. class SpinnerInterface:
  12. def spin(self):
  13. # type: () -> None
  14. raise NotImplementedError()
  15. def finish(self, final_status):
  16. # type: (str) -> None
  17. raise NotImplementedError()
  18. class InteractiveSpinner(SpinnerInterface):
  19. def __init__(
  20. self,
  21. message,
  22. file=None,
  23. spin_chars="-\\|/",
  24. # Empirically, 8 updates/second looks nice
  25. min_update_interval_seconds=0.125,
  26. ):
  27. # type: (str, IO[str], str, float) -> None
  28. self._message = message
  29. if file is None:
  30. file = sys.stdout
  31. self._file = file
  32. self._rate_limiter = RateLimiter(min_update_interval_seconds)
  33. self._finished = False
  34. self._spin_cycle = itertools.cycle(spin_chars)
  35. self._file.write(" " * get_indentation() + self._message + " ... ")
  36. self._width = 0
  37. def _write(self, status):
  38. # type: (str) -> None
  39. assert not self._finished
  40. # Erase what we wrote before by backspacing to the beginning, writing
  41. # spaces to overwrite the old text, and then backspacing again
  42. backup = "\b" * self._width
  43. self._file.write(backup + " " * self._width + backup)
  44. # Now we have a blank slate to add our status
  45. self._file.write(status)
  46. self._width = len(status)
  47. self._file.flush()
  48. self._rate_limiter.reset()
  49. def spin(self):
  50. # type: () -> None
  51. if self._finished:
  52. return
  53. if not self._rate_limiter.ready():
  54. return
  55. self._write(next(self._spin_cycle))
  56. def finish(self, final_status):
  57. # type: (str) -> None
  58. if self._finished:
  59. return
  60. self._write(final_status)
  61. self._file.write("\n")
  62. self._file.flush()
  63. self._finished = True
  64. # Used for dumb terminals, non-interactive installs (no tty), etc.
  65. # We still print updates occasionally (once every 60 seconds by default) to
  66. # act as a keep-alive for systems like Travis-CI that take lack-of-output as
  67. # an indication that a task has frozen.
  68. class NonInteractiveSpinner(SpinnerInterface):
  69. def __init__(self, message, min_update_interval_seconds=60):
  70. # type: (str, float) -> None
  71. self._message = message
  72. self._finished = False
  73. self._rate_limiter = RateLimiter(min_update_interval_seconds)
  74. self._update("started")
  75. def _update(self, status):
  76. # type: (str) -> None
  77. assert not self._finished
  78. self._rate_limiter.reset()
  79. logger.info("%s: %s", self._message, status)
  80. def spin(self):
  81. # type: () -> None
  82. if self._finished:
  83. return
  84. if not self._rate_limiter.ready():
  85. return
  86. self._update("still running...")
  87. def finish(self, final_status):
  88. # type: (str) -> None
  89. if self._finished:
  90. return
  91. self._update(f"finished with status '{final_status}'")
  92. self._finished = True
  93. class RateLimiter:
  94. def __init__(self, min_update_interval_seconds):
  95. # type: (float) -> None
  96. self._min_update_interval_seconds = min_update_interval_seconds
  97. self._last_update = 0 # type: float
  98. def ready(self):
  99. # type: () -> bool
  100. now = time.time()
  101. delta = now - self._last_update
  102. return delta >= self._min_update_interval_seconds
  103. def reset(self):
  104. # type: () -> None
  105. self._last_update = time.time()
  106. @contextlib.contextmanager
  107. def open_spinner(message):
  108. # type: (str) -> Iterator[SpinnerInterface]
  109. # Interactive spinner goes directly to sys.stdout rather than being routed
  110. # through the logging system, but it acts like it has level INFO,
  111. # i.e. it's only displayed if we're at level INFO or better.
  112. # Non-interactive spinner goes through the logging system, so it is always
  113. # in sync with logging configuration.
  114. if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
  115. spinner = InteractiveSpinner(message) # type: SpinnerInterface
  116. else:
  117. spinner = NonInteractiveSpinner(message)
  118. try:
  119. with hidden_cursor(sys.stdout):
  120. yield spinner
  121. except KeyboardInterrupt:
  122. spinner.finish("canceled")
  123. raise
  124. except Exception:
  125. spinner.finish("error")
  126. raise
  127. else:
  128. spinner.finish("done")
  129. @contextlib.contextmanager
  130. def hidden_cursor(file):
  131. # type: (IO[str]) -> Iterator[None]
  132. # The Windows terminal does not support the hide/show cursor ANSI codes,
  133. # even via colorama. So don't even try.
  134. if WINDOWS:
  135. yield
  136. # We don't want to clutter the output with control characters if we're
  137. # writing to a file, or if the user is running with --quiet.
  138. # See https://github.com/pypa/pip/issues/3418
  139. elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
  140. yield
  141. else:
  142. file.write(HIDE_CURSOR)
  143. try:
  144. yield
  145. finally:
  146. file.write(SHOW_CURSOR)