123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from fastapi import APIRouter, Depends
- from fastapi import Request
- from fastapi.responses import RedirectResponse, PlainTextResponse
- from sqlalchemy.orm import Session
- from database import get_db
- import hashlib
- import uuid
- from common import security
- from models import *
- from common.auth_user import *
- from common import YzyApi
- from config import settings
- from extensions import logger
- import requests
- from exceptions import *
- from urllib.parse import quote
- from utils import *
- from utils.redis_util import *
- from datetime import timedelta
- router = APIRouter()
- @router.get("/tyrz/login")
- async def login(
- *,
- request: Request,
- code: str,
- db: Session = Depends(get_db)
- ):
- logger.info("统一认证登录 code: {}", code)
- print(request.client.host)
-
- if code is None or code == '':
- return PlainTextResponse("统一身份证失败,原因:取code错误")
- get_token_url = settings.TYRZ_GET_TOKEN
- logger.debug("get_token_url: {}", get_token_url)
- access_token = ''
- userid = ''
- mobile = ""
- sfzh = ""
- try:
- headers = {
- "Content-Type": "application/x-www-form-urlencoded"
- }
- data = {
- "client_id": settings.TYRZ_CLIENT_ID,
- "grant_type": "authorization_code",
- "redirect_uri": settings.TYRZ_REDIRECT_URI,
- "code": code,
- "client_secret": settings.TYRZ_CLIENT_SECRET
- }
- print('data', data)
- response = requests.post(get_token_url, data=data, headers=headers, timeout=15)
- print(response.text)
- if response.status_code == 200 :
- result = response.json()
- status = int(result['status'])
- if status == 0:
- data = result['data']
- access_token = data['access_token']
- expires_in = data['expires_in']
- userid = data['expires_in']
- else:
- message = result['message']
- return PlainTextResponse("统一身份证失败,原因:"+message)
-
- data = {
- "access_token": access_token
- }
- get_token_info_url = settings.TYRZ_GET_TOKEN_INFO
- response = requests.post(get_token_info_url, data=data, timeout=60)
- print(response.text)
- if response.status_code == 200 :
- result = response.json()
- status = int(result['status'])
- if status == 0:
- data = result['data']
- mobile = data['mobile']
- name = data['name']
- sfzh = data['certificateNumber']
- else:
- message = result['message']
- return PlainTextResponse("统一身份证失败,原因:"+message)
- except Exception as e:
- return PlainTextResponse("统一身份证超时,请稍后再试。")
-
- row = db.query(SysUser).filter_by(SysUser.yzy_account == mobile).first()
- if row is None:
- logger.error("没有匹配的账号绑定用户。")
- user = {"username": name, "mobile": mobile}
- return {}
- user_id = str(row.user_id)
- auth = {
- "user_id": user_id,
- "user_name": row.user_name,
- "nick_name": row.nick_name,
- "is_yzy_user": "1"
- }
- request.session['user_auth'] = auth
- request.session['user_auth_sign'] = data_auth_sign(auth)
- request.session['user_name'] = row.user_name
- # db_czrz_serv.log_username(db, row.uid, row.username, "登录", "后台管理账号+密码登录成功", request.client.host)
- row.login_date = datetime.now()
- row.login_ip = request.client.host
- # row.login = row.login + 1
- db.commit()
- access_token_expires = timedelta(seconds = 7200)
- access_token = security.create_access_token(
- data={"sub": user_id}, expires_delta = access_token_expires
- )
- refresh_token_expires = timedelta(seconds = 7200)
- refresh_token = security.create_access_token(
- data={"sub": user_id}, expires_delta = refresh_token_expires
- )
- return RedirectResponse(url="/yjzp/#/login?token="+access_token)
|