# -*- coding: utf-8 -*- from fastapi import APIRouter, Depends, Request, Header, Form, Body from fastapi.responses import FileResponse, StreamingResponse from sqlalchemy.orm import Session from fastapi.responses import JSONResponse from database import get_db from utils import * from utils.vcode import * from utils.redis_util import * import base64 from common.const import * from io import BytesIO from utils.StripTagsHTMLParser import * from common import security from datetime import timedelta from common.security import valid_access_token from common.auth_user import * from common import YzyApi from models import * from urllib.parse import quote import requests router = APIRouter() @router.get('/tenant/list') async def tenant_list( request: Request, db: Session = Depends(get_db) ): return { "code": 200, "msg": "操作成功", "data": { "tenantEnabled": False, "voList": [] } } def buildVerificationCodeRedisKey(uuid: str) -> str: return VERIFICATION_CODE_REDIS_PREFIX.format(uuid) @router.get('/code') async def code( request: Request, db: Session = Depends(get_db) ): uuid_str = md5(new_guid()) image, code = getVerifyCode() buf = BytesIO() image.save(buf, 'png') img_data = buf.getvalue() image_bytes_io = BytesIO(img_data) image_bytes_io.seek(0) image_bytes = image_bytes_io.read() base64_str = base64.b64encode(image_bytes).decode('utf-8') redis_key = "kaptcha_" + buildVerificationCodeRedisKey(uuid_str) redis_set(redis_key, code) return { "code": 200, "msg": "操作成功", "data": { "captchaEnabled": True, "uuid": uuid_str, "img": base64_str } } @router.post('/login') async def login( request: Request, db: Session = Depends(get_db), data: dict = Depends(remove_xss_json) ): # tenantId = data['tenantId'] username = data['username'] password = data['password'] # rememberMe = data['rememberMe'] uuid_str = data['uuid'] code = data['code'] # clientId = data['clientId'] # grantType = data['grantType'] # 仅为了可能的兼容 clientId = "e5cd7e4891bf95d1d19206ce24a7b32e" grantType = "password" uuid = buildVerificationCodeRedisKey(uuid_str) redis_key = "kaptcha_" + uuid redis_code = redis_get(redis_key) if code is None or code != redis_code: return { "code": 500, "msg": "图片验证码不正确", } redis_login_key = "login_user_" + username login_error_times = redis_get(redis_login_key) if login_error_times is None: login_error_times = 0 else: login_error_times = int(login_error_times) if login_error_times >= 5: return { "code": 500, "msg": "登录错误多,请5分钟后再尝试!", } # userpass = ase_utils.aesDecrypt(loginkey, userpass) row = db.query(SysUser).filter(SysUser.user_name == username).first() if row is None: login_error_times = login_error_times + 1 redis_set_with_time(redis_login_key, str(login_error_times), 300) return JSONResponse(status_code=404, content={"code": 404, "msg": "帐号或者密码错误"}) # return { # "error": 1, # "errmsg": "帐号或者密码错误", # } if row.password != password: login_error_times = login_error_times + 1 redis_set_with_time(redis_login_key, str(login_error_times), 300) return JSONResponse(status_code=404, content={"code":404,"msg":"帐号或者密码错误"}) ''' m = hashlib.md5() m.update(userpass.encode('utf-8')) password_md5 = m.hexdigest() password_db = row.password if password_md5 != password_db: login_error_times = login_error_times + 1 redis_set_with_time(redis_login_key, str(login_error_times), 300) return { "error": 1, "errmsg": "帐号或者密码错误", } # 校验长期(超过1个月)未使用的账号和及开通后未及时(如72小时)修改初始密码的账号做清除 last_login_time = datetime.fromtimestamp(row.last_login_time) if row.login == 0: # 计算初始化的时间和当前时间相差的小时数 diff_hour = (datetime.now() - last_login_time).seconds/3600 if diff_hour > 72: return { "error": 1, "errmsg": "你的账号在开通后(72小时)内未登录及修改初始密码,账号已被锁定,请联系管理员处理,否则将被清除。", } else: # 计算上次登录到当前时间的相差天数 diff_day = (datetime.now() - last_login_time).days if diff_day > 30: return { "error": 1, "errmsg": "你的账号在超过30天未登录使用,账号已被锁定,请联系管理员处理,否则将被清除。", } redis_set_with_time(redis_login_key, str(0), 1) ''' user_id = str(row.user_id) auth = { "user_id": user_id, "user_name": row.user_name, "nick_name": row.nick_name, "is_yzy_user": "0" } request.session['user_auth'] = auth request.session['user_auth_sign'] = data_auth_sign(auth) request.session['user_name'] = username # db_czrz_serv.log_username(db, row.uid, row.username, "登录", "后台管理账号+密码登录成功", request.client.host) row.login_date = datetime.now() row.login_ip = request.client.host # row.login = row.login + 1 db.commit() access_token_expires = timedelta(days = 5) access_token = security.create_access_token( data={"sub": user_id}, expires_delta = access_token_expires ) refresh_token_expires = timedelta(days = 5) refresh_token = security.create_access_token( data={"sub": user_id}, expires_delta = refresh_token_expires ) return { "code": 200, "msg": "操作成功", "data": { "access_token": access_token, "refresh_token": refresh_token, "expire_in": 7200, "refresh_expire_in": 7200, "client_id": clientId, "scope": "", "openid": "" } } @router.post('/logout') async def logout( request: Request, db: Session = Depends(get_db), user: AuthUser = Depends(get_auth_user) ): logger.info("logout ok") request.session.clear() try: if user.is_yzy_user == 1: logout_url = settings.TYRZ_LOGOUT.format(settings.TYRZ_CLIENT_ID) + quote(settings.HOME_URL+"/yjzp/") logger.info(logout_url) else: logout_url = settings.HOME_URL + "/yjzp/" except: logout_url = settings.HOME_URL+"/yjzp/" return { "code": 200, "msg": "退出成功", "data": logout_url } # 小屏专用 @router.post('/yzy/callback') async def yzy( request: Request, db: Session = Depends(get_db), data: dict = Depends(remove_xss_json) ): code = data['code'] state = data['state'] state_str = base64.b64decode(state).decode('utf-8') state_json = json.loads(state_str) logger.info("code:{}, {}", code, state_json) resp = YzyApi.get_user_info(code) if resp['errcode'] != 0: return { "code": 500, "msg": "Code异常" } user_id = resp['UserId'] row = db.query(YzyOrgUserEntity).filter(YzyOrgUserEntity.userid == user_id).first() if row is None: return { "code": 500, "msg": "user_id异常" } yzy_account = row.account row = db.query(SysUser).filter(SysUser.yzy_account == yzy_account).first() if row is None: return { "code": 500, "msg": "用户不是本系统用户" } user_id = str(row.user_id) access_token_expires = timedelta(seconds = 7200) access_token = security.create_access_token( data={"sub": user_id}, expires_delta = access_token_expires ) refresh_token_expires = timedelta(seconds = 7200) refresh_token = security.create_access_token( data={"sub": user_id}, expires_delta = refresh_token_expires ) return { "code": 200, "msg": "粤政易登录成功", "data": { "access_token": access_token, "refresh_token": refresh_token, "expire_in": 7200, "refresh_expire_in": 7200, "scope": "", "redirect_url": state_json['redirect_url'] } } # 粤政易token登录 @router.post('/yzylogin') async def login( request: Request, data: dict = Depends(remove_xss_json), db: Session = Depends(get_db) ): code = data['code'] redis_key = "yzy_" + code user_id = redis_get(redis_key) if user_id is None: return { "code": 500, "msg": "用户不是本系统用户" } row = db.query(SysUser).filter(SysUser.user_id == int(user_id)).first() if row is None: return { "code": 500, "msg": "用户不是本系统用户" } user_id = str(row.user_id) auth = { "user_id": user_id, "user_name": row.user_name, "nick_name": row.nick_name, "is_yzy_user": "1" } request.session['user_auth'] = auth request.session['user_auth_sign'] = data_auth_sign(auth) request.session['user_name'] = row.user_name # db_czrz_serv.log_username(db, row.uid, row.username, "登录", "后台管理账号+密码登录成功", request.client.host) row.login_date = datetime.now() row.login_ip = request.client.host # row.login = row.login + 1 db.commit() access_token_expires = timedelta(days = 5) access_token = security.create_access_token( data={"sub": user_id}, expires_delta = access_token_expires ) refresh_token_expires = timedelta(days = 5) refresh_token = security.create_access_token( data={"sub": user_id}, expires_delta = refresh_token_expires ) return { "code": 200, "msg": "操作成功", "data": { "access_token": access_token, "refresh_token": refresh_token, "expire_in": 7200, "refresh_expire_in": 7200, "client_id": 0, "scope": "", "openid": "" } }