#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Depends from fastapi import Request from fastapi.responses import RedirectResponse, PlainTextResponse from sqlalchemy.orm import Session from database import get_db import hashlib import uuid from common import security from models import * from common.auth_user import * from common import YzyApi from config import settings from extensions import logger import requests from exceptions import * from urllib.parse import quote from utils import * from utils.redis_util import * from datetime import timedelta router = APIRouter() @router.get("/tyrz/login") async def login( *, request: Request, code: str, db: Session = Depends(get_db) ): logger.info("统一认证登录 code: {}", code) print(request.client.host) if code is None or code == '': return PlainTextResponse("统一身份证失败,原因:取code错误") get_token_url = settings.TYRZ_GET_TOKEN logger.debug("get_token_url: {}", get_token_url) access_token = '' userid = '' mobile = "" sfzh = "" try: headers = { "Content-Type": "application/x-www-form-urlencoded" } data = { "client_id": settings.TYRZ_CLIENT_ID, "grant_type": "authorization_code", "redirect_uri": settings.TYRZ_REDIRECT_URI, "code": code, "client_secret": settings.TYRZ_CLIENT_SECRET } print('data', data) response = requests.post(get_token_url, data=data, headers=headers, timeout=15) print(response.text) if response.status_code == 200 : result = response.json() status = int(result['status']) if status == 0: data = result['data'] access_token = data['access_token'] expires_in = data['expires_in'] userid = data['expires_in'] else: message = result['message'] return PlainTextResponse("统一身份证失败,原因:"+message) data = { "access_token": access_token } get_token_info_url = settings.TYRZ_GET_TOKEN_INFO response = requests.post(get_token_info_url, data=data, timeout=60) print(response.text) if response.status_code == 200 : result = response.json() status = int(result['status']) if status == 0: data = result['data'] mobile = data['mobile'] name = data['name'] sfzh = data['certificateNumber'] else: message = result['message'] return PlainTextResponse("统一身份证失败,原因:"+message) except Exception as e: return PlainTextResponse("统一身份证超时,请稍后再试。") row = db.query(SysUser).filter_by(SysUser.yzy_account == mobile).first() if row is None: logger.error("没有匹配的账号绑定用户。") user = {"username": name, "mobile": mobile} return {} user_id = str(row.user_id) auth = { "user_id": user_id, "user_name": row.user_name, "nick_name": row.nick_name, "is_yzy_user": "1" } request.session['user_auth'] = auth request.session['user_auth_sign'] = data_auth_sign(auth) request.session['user_name'] = row.user_name # db_czrz_serv.log_username(db, row.uid, row.username, "登录", "后台管理账号+密码登录成功", request.client.host) row.login_date = datetime.now() row.login_ip = request.client.host # row.login = row.login + 1 db.commit() access_token_expires = timedelta(seconds = 7200) access_token = security.create_access_token( data={"sub": user_id}, expires_delta = access_token_expires ) refresh_token_expires = timedelta(seconds = 7200) refresh_token = security.create_access_token( data={"sub": user_id}, expires_delta = refresh_token_expires ) return RedirectResponse(url="/yjzp/#/login?token="+access_token)