direct_url.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. """ PEP 610 """
  2. import json
  3. import re
  4. import urllib.parse
  5. from typing import Any, Dict, Iterable, Optional, Type, TypeVar, Union
  6. __all__ = [
  7. "DirectUrl",
  8. "DirectUrlValidationError",
  9. "DirInfo",
  10. "ArchiveInfo",
  11. "VcsInfo",
  12. ]
  13. T = TypeVar("T")
  14. DIRECT_URL_METADATA_NAME = "direct_url.json"
  15. ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$")
  16. class DirectUrlValidationError(Exception):
  17. pass
  18. def _get(d, expected_type, key, default=None):
  19. # type: (Dict[str, Any], Type[T], str, Optional[T]) -> Optional[T]
  20. """Get value from dictionary and verify expected type."""
  21. if key not in d:
  22. return default
  23. value = d[key]
  24. if not isinstance(value, expected_type):
  25. raise DirectUrlValidationError(
  26. "{!r} has unexpected type for {} (expected {})".format(
  27. value, key, expected_type
  28. )
  29. )
  30. return value
  31. def _get_required(d, expected_type, key, default=None):
  32. # type: (Dict[str, Any], Type[T], str, Optional[T]) -> T
  33. value = _get(d, expected_type, key, default)
  34. if value is None:
  35. raise DirectUrlValidationError(f"{key} must have a value")
  36. return value
  37. def _exactly_one_of(infos):
  38. # type: (Iterable[Optional[InfoType]]) -> InfoType
  39. infos = [info for info in infos if info is not None]
  40. if not infos:
  41. raise DirectUrlValidationError(
  42. "missing one of archive_info, dir_info, vcs_info"
  43. )
  44. if len(infos) > 1:
  45. raise DirectUrlValidationError(
  46. "more than one of archive_info, dir_info, vcs_info"
  47. )
  48. assert infos[0] is not None
  49. return infos[0]
  50. def _filter_none(**kwargs):
  51. # type: (Any) -> Dict[str, Any]
  52. """Make dict excluding None values."""
  53. return {k: v for k, v in kwargs.items() if v is not None}
  54. class VcsInfo:
  55. name = "vcs_info"
  56. def __init__(
  57. self,
  58. vcs, # type: str
  59. commit_id, # type: str
  60. requested_revision=None, # type: Optional[str]
  61. resolved_revision=None, # type: Optional[str]
  62. resolved_revision_type=None, # type: Optional[str]
  63. ):
  64. self.vcs = vcs
  65. self.requested_revision = requested_revision
  66. self.commit_id = commit_id
  67. self.resolved_revision = resolved_revision
  68. self.resolved_revision_type = resolved_revision_type
  69. @classmethod
  70. def _from_dict(cls, d):
  71. # type: (Optional[Dict[str, Any]]) -> Optional[VcsInfo]
  72. if d is None:
  73. return None
  74. return cls(
  75. vcs=_get_required(d, str, "vcs"),
  76. commit_id=_get_required(d, str, "commit_id"),
  77. requested_revision=_get(d, str, "requested_revision"),
  78. resolved_revision=_get(d, str, "resolved_revision"),
  79. resolved_revision_type=_get(d, str, "resolved_revision_type"),
  80. )
  81. def _to_dict(self):
  82. # type: () -> Dict[str, Any]
  83. return _filter_none(
  84. vcs=self.vcs,
  85. requested_revision=self.requested_revision,
  86. commit_id=self.commit_id,
  87. resolved_revision=self.resolved_revision,
  88. resolved_revision_type=self.resolved_revision_type,
  89. )
  90. class ArchiveInfo:
  91. name = "archive_info"
  92. def __init__(
  93. self,
  94. hash=None, # type: Optional[str]
  95. ):
  96. self.hash = hash
  97. @classmethod
  98. def _from_dict(cls, d):
  99. # type: (Optional[Dict[str, Any]]) -> Optional[ArchiveInfo]
  100. if d is None:
  101. return None
  102. return cls(hash=_get(d, str, "hash"))
  103. def _to_dict(self):
  104. # type: () -> Dict[str, Any]
  105. return _filter_none(hash=self.hash)
  106. class DirInfo:
  107. name = "dir_info"
  108. def __init__(
  109. self,
  110. editable=False, # type: bool
  111. ):
  112. self.editable = editable
  113. @classmethod
  114. def _from_dict(cls, d):
  115. # type: (Optional[Dict[str, Any]]) -> Optional[DirInfo]
  116. if d is None:
  117. return None
  118. return cls(
  119. editable=_get_required(d, bool, "editable", default=False)
  120. )
  121. def _to_dict(self):
  122. # type: () -> Dict[str, Any]
  123. return _filter_none(editable=self.editable or None)
  124. InfoType = Union[ArchiveInfo, DirInfo, VcsInfo]
  125. class DirectUrl:
  126. def __init__(
  127. self,
  128. url, # type: str
  129. info, # type: InfoType
  130. subdirectory=None, # type: Optional[str]
  131. ):
  132. self.url = url
  133. self.info = info
  134. self.subdirectory = subdirectory
  135. def _remove_auth_from_netloc(self, netloc):
  136. # type: (str) -> str
  137. if "@" not in netloc:
  138. return netloc
  139. user_pass, netloc_no_user_pass = netloc.split("@", 1)
  140. if (
  141. isinstance(self.info, VcsInfo) and
  142. self.info.vcs == "git" and
  143. user_pass == "git"
  144. ):
  145. return netloc
  146. if ENV_VAR_RE.match(user_pass):
  147. return netloc
  148. return netloc_no_user_pass
  149. @property
  150. def redacted_url(self):
  151. # type: () -> str
  152. """url with user:password part removed unless it is formed with
  153. environment variables as specified in PEP 610, or it is ``git``
  154. in the case of a git URL.
  155. """
  156. purl = urllib.parse.urlsplit(self.url)
  157. netloc = self._remove_auth_from_netloc(purl.netloc)
  158. surl = urllib.parse.urlunsplit(
  159. (purl.scheme, netloc, purl.path, purl.query, purl.fragment)
  160. )
  161. return surl
  162. def validate(self):
  163. # type: () -> None
  164. self.from_dict(self.to_dict())
  165. @classmethod
  166. def from_dict(cls, d):
  167. # type: (Dict[str, Any]) -> DirectUrl
  168. return DirectUrl(
  169. url=_get_required(d, str, "url"),
  170. subdirectory=_get(d, str, "subdirectory"),
  171. info=_exactly_one_of(
  172. [
  173. ArchiveInfo._from_dict(_get(d, dict, "archive_info")),
  174. DirInfo._from_dict(_get(d, dict, "dir_info")),
  175. VcsInfo._from_dict(_get(d, dict, "vcs_info")),
  176. ]
  177. ),
  178. )
  179. def to_dict(self):
  180. # type: () -> Dict[str, Any]
  181. res = _filter_none(
  182. url=self.redacted_url,
  183. subdirectory=self.subdirectory,
  184. )
  185. res[self.info.name] = self.info._to_dict()
  186. return res
  187. @classmethod
  188. def from_json(cls, s):
  189. # type: (str) -> DirectUrl
  190. return cls.from_dict(json.loads(s))
  191. def to_json(self):
  192. # type: () -> str
  193. return json.dumps(self.to_dict(), sort_keys=True)