zwrz.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from fastapi import APIRouter, Depends, Query
  4. from fastapi import Request
  5. from fastapi.responses import RedirectResponse, PlainTextResponse
  6. from sqlalchemy.orm import Session
  7. from database import get_db
  8. import hashlib
  9. import uuid
  10. from common import security
  11. from models import *
  12. from common.auth_user import *
  13. from common import YzyApi
  14. from config import settings
  15. from extensions import logger
  16. import requests
  17. from exceptions import *
  18. from urllib.parse import quote
  19. from utils import *
  20. from utils.redis_util import *
  21. from datetime import timedelta
  22. import traceback
  23. router = APIRouter()
  24. @router.get("/tyrz/login")
  25. async def login(
  26. *,
  27. request: Request,
  28. code: str,
  29. state: str = Query(None),
  30. db: Session = Depends(get_db)
  31. ):
  32. logger.info("统一认证登录 code: {}, redirect: {}", code, state)
  33. print(request.client.host)
  34. if code is None or code == '':
  35. return PlainTextResponse("统一身份证失败,原因:取code错误")
  36. get_token_url = settings.TYRZ_GET_TOKEN
  37. logger.debug("get_token_url: {}", get_token_url)
  38. access_token = ''
  39. userid = ''
  40. mobile = ""
  41. sfzh = ""
  42. try:
  43. headers = {
  44. "Content-Type": "application/x-www-form-urlencoded"
  45. }
  46. data = {
  47. "client_id": settings.TYRZ_CLIENT_ID,
  48. "grant_type": "authorization_code",
  49. "redirect_uri": settings.TYRZ_REDIRECT_URI,
  50. "code": code,
  51. "client_secret": settings.TYRZ_CLIENT_SECRET
  52. }
  53. print('data:', data)
  54. response = requests.post(get_token_url, data=data, headers=headers, timeout=15)
  55. print("统一身份证 response:", response.text)
  56. if response.status_code == 200 :
  57. result = response.json()
  58. status = int(result['status'])
  59. if status == 0:
  60. data = result['data']
  61. access_token = data['access_token']
  62. expires_in = data['expires_in']
  63. userid = data['expires_in']
  64. else:
  65. message = result['message']
  66. return PlainTextResponse("统一身份证失败,原因:"+message)
  67. data = {
  68. "access_token": access_token
  69. }
  70. get_token_info_url = settings.TYRZ_GET_TOKEN_INFO
  71. response = requests.post(get_token_info_url, data=data, timeout=60)
  72. print(response.text)
  73. if response.status_code == 200 :
  74. result = response.json()
  75. status = int(result['status'])
  76. if status == 0:
  77. data = result['data']
  78. userId = data['userId'] # 用户粤政易ID
  79. mobile = data['mobile']
  80. name = data['name']
  81. sfzh = data['certificateNumber']
  82. #units = data['units']
  83. #if len(units) > 0:
  84. # unitPath = units['0']['unitPath']
  85. else:
  86. message = result['message']
  87. return PlainTextResponse("统一身份证失败,原因:"+message)
  88. except Exception as e:
  89. traceback.print_exc()
  90. return PlainTextResponse("统一身份证超时,请稍后再试。")
  91. row = db.query(SysUser).filter(SysUser.yzy_account == mobile).first()
  92. if row is None:
  93. logger.error("没有匹配的账号绑定用户。")
  94. user = {"username": name, "mobile": mobile}
  95. goto_url = f"/{state}/#/noyzyuser"
  96. return RedirectResponse(url=goto_url)
  97. # 保存user_id
  98. code = new_guid()
  99. redis_set_with_time("yzy_" + code, str(row.user_id), 600)
  100. if state == 'yjzp':
  101. goto_url = f"/{state}/#/yzylogin?code={code}"
  102. elif state == 'yjdp':
  103. goto_url = f"https://yjdp.mmsyjj.cn:8086/{state}/#/yzylogin?code={code}"
  104. logger.info("goto_url: {}", goto_url)
  105. return RedirectResponse(url=goto_url)