|
@@ -0,0 +1,128 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+from fastapi import APIRouter, Depends
|
|
|
+from fastapi import Request
|
|
|
+from fastapi.responses import RedirectResponse, PlainTextResponse
|
|
|
+from sqlalchemy.orm import Session
|
|
|
+from database import get_db
|
|
|
+import hashlib
|
|
|
+import uuid
|
|
|
+from common import security
|
|
|
+from models import *
|
|
|
+from common.auth_user import *
|
|
|
+from common import YzyApi
|
|
|
+from config import settings
|
|
|
+from extensions import logger
|
|
|
+import requests
|
|
|
+from exceptions import *
|
|
|
+from urllib.parse import quote
|
|
|
+from utils import *
|
|
|
+from utils.redis_util import *
|
|
|
+from datetime import timedelta
|
|
|
+
|
|
|
+router = APIRouter()
|
|
|
+
|
|
|
+@router.get("/tyrz/login")
|
|
|
+async def login(
|
|
|
+ *,
|
|
|
+ request: Request,
|
|
|
+ code: str,
|
|
|
+ db: Session = Depends(get_db)
|
|
|
+):
|
|
|
+ logger.info("统一认证登录 code: {}", code)
|
|
|
+
|
|
|
+ print(request.client.host)
|
|
|
+
|
|
|
+ if code is None or code == '':
|
|
|
+ return PlainTextResponse("统一身份证失败,原因:取code错误")
|
|
|
+
|
|
|
+ get_token_url = settings.TYRZ_GET_TOKEN
|
|
|
+ logger.debug("get_token_url: {}", get_token_url)
|
|
|
+
|
|
|
+ access_token = ''
|
|
|
+ userid = ''
|
|
|
+ mobile = ""
|
|
|
+ sfzh = ""
|
|
|
+
|
|
|
+ try:
|
|
|
+ headers = {
|
|
|
+ "Content-Type": "application/x-www-form-urlencoded"
|
|
|
+ }
|
|
|
+ data = {
|
|
|
+ "client_id": settings.TYRZ_CLIENT_ID,
|
|
|
+ "grant_type": "authorization_code",
|
|
|
+ "redirect_uri": settings.TYRZ_REDIRECT_URI,
|
|
|
+ "code": code,
|
|
|
+ "client_secret": settings.TYRZ_CLIENT_SECRET
|
|
|
+ }
|
|
|
+ print('data', data)
|
|
|
+ response = requests.post(get_token_url, data=data, headers=headers, timeout=15)
|
|
|
+ print(response.text)
|
|
|
+ if response.status_code == 200 :
|
|
|
+ result = response.json()
|
|
|
+ status = int(result['status'])
|
|
|
+ if status == 0:
|
|
|
+ data = result['data']
|
|
|
+ access_token = data['access_token']
|
|
|
+ expires_in = data['expires_in']
|
|
|
+ userid = data['expires_in']
|
|
|
+ else:
|
|
|
+ message = result['message']
|
|
|
+ return PlainTextResponse("统一身份证失败,原因:"+message)
|
|
|
+
|
|
|
+ data = {
|
|
|
+ "access_token": access_token
|
|
|
+ }
|
|
|
+ get_token_info_url = settings.TYRZ_GET_TOKEN_INFO
|
|
|
+ response = requests.post(get_token_info_url, data=data, timeout=60)
|
|
|
+ print(response.text)
|
|
|
+ if response.status_code == 200 :
|
|
|
+ result = response.json()
|
|
|
+ status = int(result['status'])
|
|
|
+ if status == 0:
|
|
|
+ data = result['data']
|
|
|
+ mobile = data['mobile']
|
|
|
+ name = data['name']
|
|
|
+ sfzh = data['certificateNumber']
|
|
|
+ else:
|
|
|
+ message = result['message']
|
|
|
+ return PlainTextResponse("统一身份证失败,原因:"+message)
|
|
|
+ except Exception as e:
|
|
|
+ return PlainTextResponse("统一身份证超时,请稍后再试。")
|
|
|
+
|
|
|
+ row = db.query(SysUser).filter_by(SysUser.yzy_account == mobile).first()
|
|
|
+ if row is None:
|
|
|
+ logger.error("没有匹配的账号绑定用户。")
|
|
|
+ user = {"username": name, "mobile": mobile}
|
|
|
+ return {}
|
|
|
+
|
|
|
+ user_id = str(row.user_id)
|
|
|
+
|
|
|
+ auth = {
|
|
|
+ "user_id": user_id,
|
|
|
+ "user_name": row.user_name,
|
|
|
+ "nick_name": row.nick_name,
|
|
|
+ "is_yzy_user": "1"
|
|
|
+ }
|
|
|
+
|
|
|
+ request.session['user_auth'] = auth
|
|
|
+ request.session['user_auth_sign'] = data_auth_sign(auth)
|
|
|
+ request.session['user_name'] = row.user_name
|
|
|
+
|
|
|
+ # db_czrz_serv.log_username(db, row.uid, row.username, "登录", "后台管理账号+密码登录成功", request.client.host)
|
|
|
+ row.login_date = datetime.now()
|
|
|
+ row.login_ip = request.client.host
|
|
|
+ # row.login = row.login + 1
|
|
|
+ db.commit()
|
|
|
+
|
|
|
+ access_token_expires = timedelta(seconds = 7200)
|
|
|
+ access_token = security.create_access_token(
|
|
|
+ data={"sub": user_id}, expires_delta = access_token_expires
|
|
|
+ )
|
|
|
+
|
|
|
+ refresh_token_expires = timedelta(seconds = 7200)
|
|
|
+ refresh_token = security.create_access_token(
|
|
|
+ data={"sub": user_id}, expires_delta = refresh_token_expires
|
|
|
+ )
|
|
|
+
|
|
|
+ return RedirectResponse(url="/yjzp/#/login?token="+access_token)
|