progress_bars.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. import itertools
  2. import sys
  3. from signal import SIGINT, default_int_handler, signal
  4. from typing import Any, Dict, List
  5. from pip._vendor.progress.bar import Bar, FillingCirclesBar, IncrementalBar
  6. from pip._vendor.progress.spinner import Spinner
  7. from pip._internal.utils.compat import WINDOWS
  8. from pip._internal.utils.logging import get_indentation
  9. from pip._internal.utils.misc import format_size
  10. try:
  11. from pip._vendor import colorama
  12. # Lots of different errors can come from this, including SystemError and
  13. # ImportError.
  14. except Exception:
  15. colorama = None
  16. def _select_progress_class(preferred, fallback):
  17. # type: (Bar, Bar) -> Bar
  18. encoding = getattr(preferred.file, "encoding", None)
  19. # If we don't know what encoding this file is in, then we'll just assume
  20. # that it doesn't support unicode and use the ASCII bar.
  21. if not encoding:
  22. return fallback
  23. # Collect all of the possible characters we want to use with the preferred
  24. # bar.
  25. characters = [
  26. getattr(preferred, "empty_fill", ""),
  27. getattr(preferred, "fill", ""),
  28. ]
  29. characters += list(getattr(preferred, "phases", []))
  30. # Try to decode the characters we're using for the bar using the encoding
  31. # of the given file, if this works then we'll assume that we can use the
  32. # fancier bar and if not we'll fall back to the plaintext bar.
  33. try:
  34. "".join(characters).encode(encoding)
  35. except UnicodeEncodeError:
  36. return fallback
  37. else:
  38. return preferred
  39. _BaseBar = _select_progress_class(IncrementalBar, Bar) # type: Any
  40. class InterruptibleMixin:
  41. """
  42. Helper to ensure that self.finish() gets called on keyboard interrupt.
  43. This allows downloads to be interrupted without leaving temporary state
  44. (like hidden cursors) behind.
  45. This class is similar to the progress library's existing SigIntMixin
  46. helper, but as of version 1.2, that helper has the following problems:
  47. 1. It calls sys.exit().
  48. 2. It discards the existing SIGINT handler completely.
  49. 3. It leaves its own handler in place even after an uninterrupted finish,
  50. which will have unexpected delayed effects if the user triggers an
  51. unrelated keyboard interrupt some time after a progress-displaying
  52. download has already completed, for example.
  53. """
  54. def __init__(self, *args, **kwargs):
  55. # type: (List[Any], Dict[Any, Any]) -> None
  56. """
  57. Save the original SIGINT handler for later.
  58. """
  59. # https://github.com/python/mypy/issues/5887
  60. super().__init__(*args, **kwargs) # type: ignore
  61. self.original_handler = signal(SIGINT, self.handle_sigint)
  62. # If signal() returns None, the previous handler was not installed from
  63. # Python, and we cannot restore it. This probably should not happen,
  64. # but if it does, we must restore something sensible instead, at least.
  65. # The least bad option should be Python's default SIGINT handler, which
  66. # just raises KeyboardInterrupt.
  67. if self.original_handler is None:
  68. self.original_handler = default_int_handler
  69. def finish(self):
  70. # type: () -> None
  71. """
  72. Restore the original SIGINT handler after finishing.
  73. This should happen regardless of whether the progress display finishes
  74. normally, or gets interrupted.
  75. """
  76. super().finish() # type: ignore
  77. signal(SIGINT, self.original_handler)
  78. def handle_sigint(self, signum, frame): # type: ignore
  79. """
  80. Call self.finish() before delegating to the original SIGINT handler.
  81. This handler should only be in place while the progress display is
  82. active.
  83. """
  84. self.finish()
  85. self.original_handler(signum, frame)
  86. class SilentBar(Bar):
  87. def update(self):
  88. # type: () -> None
  89. pass
  90. class BlueEmojiBar(IncrementalBar):
  91. suffix = "%(percent)d%%"
  92. bar_prefix = " "
  93. bar_suffix = " "
  94. phases = ("\U0001F539", "\U0001F537", "\U0001F535")
  95. class DownloadProgressMixin:
  96. def __init__(self, *args, **kwargs):
  97. # type: (List[Any], Dict[Any, Any]) -> None
  98. # https://github.com/python/mypy/issues/5887
  99. super().__init__(*args, **kwargs) # type: ignore
  100. self.message = (" " * (get_indentation() + 2)) + self.message # type: str
  101. @property
  102. def downloaded(self):
  103. # type: () -> str
  104. return format_size(self.index) # type: ignore
  105. @property
  106. def download_speed(self):
  107. # type: () -> str
  108. # Avoid zero division errors...
  109. if self.avg == 0.0: # type: ignore
  110. return "..."
  111. return format_size(1 / self.avg) + "/s" # type: ignore
  112. @property
  113. def pretty_eta(self):
  114. # type: () -> str
  115. if self.eta: # type: ignore
  116. return f"eta {self.eta_td}" # type: ignore
  117. return ""
  118. def iter(self, it): # type: ignore
  119. for x in it:
  120. yield x
  121. # B305 is incorrectly raised here
  122. # https://github.com/PyCQA/flake8-bugbear/issues/59
  123. self.next(len(x)) # noqa: B305
  124. self.finish()
  125. class WindowsMixin:
  126. def __init__(self, *args, **kwargs):
  127. # type: (List[Any], Dict[Any, Any]) -> None
  128. # The Windows terminal does not support the hide/show cursor ANSI codes
  129. # even with colorama. So we'll ensure that hide_cursor is False on
  130. # Windows.
  131. # This call needs to go before the super() call, so that hide_cursor
  132. # is set in time. The base progress bar class writes the "hide cursor"
  133. # code to the terminal in its init, so if we don't set this soon
  134. # enough, we get a "hide" with no corresponding "show"...
  135. if WINDOWS and self.hide_cursor: # type: ignore
  136. self.hide_cursor = False
  137. # https://github.com/python/mypy/issues/5887
  138. super().__init__(*args, **kwargs) # type: ignore
  139. # Check if we are running on Windows and we have the colorama module,
  140. # if we do then wrap our file with it.
  141. if WINDOWS and colorama:
  142. self.file = colorama.AnsiToWin32(self.file) # type: ignore
  143. # The progress code expects to be able to call self.file.isatty()
  144. # but the colorama.AnsiToWin32() object doesn't have that, so we'll
  145. # add it.
  146. self.file.isatty = lambda: self.file.wrapped.isatty()
  147. # The progress code expects to be able to call self.file.flush()
  148. # but the colorama.AnsiToWin32() object doesn't have that, so we'll
  149. # add it.
  150. self.file.flush = lambda: self.file.wrapped.flush()
  151. class BaseDownloadProgressBar(WindowsMixin, InterruptibleMixin, DownloadProgressMixin):
  152. file = sys.stdout
  153. message = "%(percent)d%%"
  154. suffix = "%(downloaded)s %(download_speed)s %(pretty_eta)s"
  155. class DefaultDownloadProgressBar(BaseDownloadProgressBar, _BaseBar):
  156. pass
  157. class DownloadSilentBar(BaseDownloadProgressBar, SilentBar):
  158. pass
  159. class DownloadBar(BaseDownloadProgressBar, Bar):
  160. pass
  161. class DownloadFillingCirclesBar(BaseDownloadProgressBar, FillingCirclesBar):
  162. pass
  163. class DownloadBlueEmojiProgressBar(BaseDownloadProgressBar, BlueEmojiBar):
  164. pass
  165. class DownloadProgressSpinner(
  166. WindowsMixin, InterruptibleMixin, DownloadProgressMixin, Spinner
  167. ):
  168. file = sys.stdout
  169. suffix = "%(downloaded)s %(download_speed)s"
  170. def next_phase(self):
  171. # type: () -> str
  172. if not hasattr(self, "_phaser"):
  173. self._phaser = itertools.cycle(self.phases)
  174. return next(self._phaser)
  175. def update(self):
  176. # type: () -> None
  177. message = self.message % self
  178. phase = self.next_phase()
  179. suffix = self.suffix % self
  180. line = "".join(
  181. [
  182. message,
  183. " " if message else "",
  184. phase,
  185. " " if suffix else "",
  186. suffix,
  187. ]
  188. )
  189. self.writeln(line)
  190. BAR_TYPES = {
  191. "off": (DownloadSilentBar, DownloadSilentBar),
  192. "on": (DefaultDownloadProgressBar, DownloadProgressSpinner),
  193. "ascii": (DownloadBar, DownloadProgressSpinner),
  194. "pretty": (DownloadFillingCirclesBar, DownloadProgressSpinner),
  195. "emoji": (DownloadBlueEmojiProgressBar, DownloadProgressSpinner),
  196. }
  197. def DownloadProgressProvider(progress_bar, max=None): # type: ignore
  198. if max is None or max == 0:
  199. return BAR_TYPES[progress_bar][1]().iter
  200. else:
  201. return BAR_TYPES[progress_bar][0](max=max).iter