zwrz.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from fastapi import APIRouter, Depends, Query
  4. from fastapi import Request
  5. from fastapi.responses import RedirectResponse, PlainTextResponse
  6. from sqlalchemy.orm import Session
  7. from database import get_db
  8. import hashlib
  9. import uuid
  10. from common import security
  11. from models import *
  12. from common.auth_user import *
  13. from common import YzyApi
  14. from config import settings
  15. from extensions import logger
  16. import requests
  17. from exceptions import *
  18. from urllib.parse import quote
  19. from utils import *
  20. from utils.redis_util import *
  21. from datetime import timedelta
  22. import traceback
  23. router = APIRouter()
  24. @router.get("/tyrz/login")
  25. async def login(
  26. *,
  27. request: Request,
  28. code: str,
  29. src: str = Query(None),
  30. redirect: str = Query(None),
  31. db: Session = Depends(get_db)
  32. ):
  33. logger.info("统一认证登录 code: {}, redirect: {}", code, src, redirect)
  34. print(request.client.host)
  35. if code is None or code == '':
  36. return PlainTextResponse("统一身份证失败,原因:取code错误")
  37. get_token_url = settings.TYRZ_GET_TOKEN
  38. logger.debug("get_token_url: {}", get_token_url)
  39. access_token = ''
  40. userid = ''
  41. mobile = ""
  42. sfzh = ""
  43. try:
  44. headers = {
  45. "Content-Type": "application/x-www-form-urlencoded"
  46. }
  47. data = {
  48. "client_id": settings.TYRZ_CLIENT_ID,
  49. "grant_type": "authorization_code",
  50. "redirect_uri": settings.TYRZ_REDIRECT_URI,
  51. "code": code,
  52. "client_secret": settings.TYRZ_CLIENT_SECRET
  53. }
  54. print('data:', data)
  55. response = requests.post(get_token_url, data=data, headers=headers, timeout=15)
  56. print("统一身份证 response:", response.text)
  57. if response.status_code == 200 :
  58. result = response.json()
  59. status = int(result['status'])
  60. if status == 0:
  61. data = result['data']
  62. access_token = data['access_token']
  63. expires_in = data['expires_in']
  64. userid = data['expires_in']
  65. else:
  66. message = result['message']
  67. return PlainTextResponse("统一身份证失败,原因:"+message)
  68. data = {
  69. "access_token": access_token
  70. }
  71. get_token_info_url = settings.TYRZ_GET_TOKEN_INFO
  72. response = requests.post(get_token_info_url, data=data, timeout=60)
  73. print(response.text)
  74. if response.status_code == 200 :
  75. result = response.json()
  76. status = int(result['status'])
  77. if status == 0:
  78. data = result['data']
  79. userId = data['userId'] # 用户粤政易ID
  80. mobile = data['mobile']
  81. name = data['name']
  82. sfzh = data['certificateNumber']
  83. #units = data['units']
  84. #if len(units) > 0:
  85. # unitPath = units['0']['unitPath']
  86. else:
  87. message = result['message']
  88. return PlainTextResponse("统一身份证失败,原因:"+message)
  89. except Exception as e:
  90. traceback.print_exc()
  91. return PlainTextResponse("统一身份证超时,请稍后再试。")
  92. row = db.query(SysUser).filter(SysUser.yzy_account == mobile).first()
  93. if row is None:
  94. logger.error("没有匹配的账号绑定用户。")
  95. user = {"username": name, "mobile": mobile}
  96. goto_url = f"/{src}/#/noyzyuser"
  97. return RedirectResponse(url=goto_url)
  98. # 保存user_id
  99. code = new_guid()
  100. redis_set_with_time("yzy_" + code, str(row.user_id), 600)
  101. goto_url = f"/{src}/#/yzylogin?code=" + code
  102. if redirect is not None:
  103. goto_url = goto_url + "&redirect="+redirect
  104. logger.info("goto_url: {}", goto_url)
  105. return RedirectResponse(url=goto_url)