#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Depends, Query from fastapi import Request from fastapi.responses import RedirectResponse, PlainTextResponse from sqlalchemy.orm import Session from database import get_db import hashlib import uuid from common import security from models import * from common.auth_user import * from common import YzyApi from config import settings from extensions import logger import requests from exceptions import * from urllib.parse import quote from utils import * from utils.redis_util import * from datetime import timedelta import traceback router = APIRouter() @router.get("/tyrz/login") async def login( *, request: Request, code: str, src: str = Query(None), redirect: str = Query(None), db: Session = Depends(get_db) ): logger.info("统一认证登录 code: {}, redirect: {}", code, src, redirect) print(request.client.host) if code is None or code == '': return PlainTextResponse("统一身份证失败,原因:取code错误") get_token_url = settings.TYRZ_GET_TOKEN logger.debug("get_token_url: {}", get_token_url) access_token = '' userid = '' mobile = "" sfzh = "" try: headers = { "Content-Type": "application/x-www-form-urlencoded" } data = { "client_id": settings.TYRZ_CLIENT_ID, "grant_type": "authorization_code", "redirect_uri": settings.TYRZ_REDIRECT_URI, "code": code, "client_secret": settings.TYRZ_CLIENT_SECRET } print('data:', data) response = requests.post(get_token_url, data=data, headers=headers, timeout=15) print("统一身份证 response:", response.text) if response.status_code == 200 : result = response.json() status = int(result['status']) if status == 0: data = result['data'] access_token = data['access_token'] expires_in = data['expires_in'] userid = data['expires_in'] else: message = result['message'] return PlainTextResponse("统一身份证失败,原因:"+message) data = { "access_token": access_token } get_token_info_url = settings.TYRZ_GET_TOKEN_INFO response = requests.post(get_token_info_url, data=data, timeout=60) print(response.text) if response.status_code == 200 : result = response.json() status = int(result['status']) if status == 0: data = result['data'] userId = data['userId'] # 用户粤政易ID mobile = data['mobile'] name = data['name'] sfzh = data['certificateNumber'] #units = data['units'] #if len(units) > 0: # unitPath = units['0']['unitPath'] else: message = result['message'] return PlainTextResponse("统一身份证失败,原因:"+message) except Exception as e: traceback.print_exc() return PlainTextResponse("统一身份证超时,请稍后再试。") row = db.query(SysUser).filter(SysUser.yzy_account == mobile).first() if row is None: logger.error("没有匹配的账号绑定用户。") user = {"username": name, "mobile": mobile} goto_url = f"/{src}/#/noyzyuser" return RedirectResponse(url=goto_url) # 保存user_id code = new_guid() redis_set_with_time("yzy_" + code, str(row.user_id), 600) goto_url = f"/{src}/#/yzylogin?code=" + code if redirect is not None: goto_url = goto_url + "&redirect="+redirect logger.info("goto_url: {}", goto_url) return RedirectResponse(url=goto_url)