|
- from fastapi import APIRouter, Request, Depends, HTTPException, Query, Header
- from sqlalchemy.exc import IntegrityError
- from fastapi.responses import HTMLResponse, FileResponse, StreamingResponse, RedirectResponse
- from fastapi.responses import JSONResponse
- from starlette.requests import HTTPConnection
- from database import get_db
- from sqlalchemy import text, exists, and_, or_, not_
- from sqlalchemy.orm import Session
- from sqlalchemy.sql import func
- from models import *
- import json
- import random
- from sqlalchemy import create_engine, select
- from typing import Optional
- from utils.StripTagsHTMLParser import *
- from common.db import db_event_management, db_user, db_area, db_emergency_plan
- from common.security import valid_access_token
- import traceback
- from utils import *
- from datetime import datetime, timedelta
- import qrcode
- from PIL import Image
- from io import BytesIO
- from config import settings
- from common import YzyApi
- from extensions import logger
- from common.enc import mpfun
- from urllib.parse import quote
- from utils.redis_util import *
- router = APIRouter()
- @router.get('/event/checkin', response_class=StreamingResponse)
- async def get_qrcode(
- request: Request,
- event_id: str,
- db: Session = Depends(get_db)
- ):
- url = str(request.url)
- print('url:', url)
- url = settings.YZY_WEB_ROOT + f"/api/qrcode/event/checkin2?event_id={event_id}&r="+rnd()
- # url = url.replace("http://" + request.url.hostname, settings.YZY_WEB_ROOT)
- logger.info('替换到互联网ip: {}', url)
- qr = qrcode.QRCode(
- version=1,
- error_correction=qrcode.constants.ERROR_CORRECT_M,
- box_size=10,
- border=0,
- )
- qr.add_data(url)
- qr.make(fit=True)
- image = qr.make_image()
- buf = BytesIO()
- image.save(buf, 'png')
- img_data = buf.getvalue()
- return StreamingResponse(BytesIO(img_data), media_type="image/png")
- @router.get('/event/checkin2')
- async def get_qrcode2(
- event_id: str,
- user_agent: str = Header(default=''),
- db: Session = Depends(get_db)
- ):
- print('user_agent:', user_agent)
- if "wxworklocal" in user_agent:
- logger.info("粤政易扫码")
- # 因为用户不一定是本系统的用户,只是为了拿到用户的粤政易信息而已
-
- redirect_uri = quote(f"{settings.YZY_WEB_ROOT}/api/qrcode/event/callback?event_id={event_id}")
- state = "signin"
-
- # detail_url = f"https://open.weixin.qq.com/connect/Oauth2/authorize?appid=wl2bee594e73&redirect_uri={redirect_uri}&response_type=code&scope=snsapi_base&agentid=1004000&state={state}#wechat_redirect"
-
- # 粤政易用户授权页面
- detail_url = f"https://xtbg.gdzwfw.gov.cn/zwwxgzt/pf/userpermit/index.html?redirect_uri={redirect_uri}&response_type=code&appid={settings.YZY_AGENTID}&state={state}"
- # redirect_url = "/signPage?event_id={}".format(event_id)
- # detail_url = YzyApi.format_redirect_url(redirect_url)
- else:
- logger.info("微信扫码")
- redirect_url = f"/yjxp/#/signPage?event_id={event_id}" # 业务页面
- detail_url = f"{settings.YZY_WEB_ROOT}{redirect_url}"
- logger.info("detail_url: {}", detail_url)
- return RedirectResponse(detail_url)
- @router.get('/event/callback')
- async def get_qrcode2(
- event_id: str,
- state: str,
- code: str,
- db: Session = Depends(get_db)
- ):
- # 获取用户的userId
- # resp = YzyApi.get_user_info(code)
- #if resp['errcode'] != 0:
- # return {
- # "code": 500,
- # "msg": "Code异常"
- # }
- #user_id = resp['UserId']
- uuid_str = new_guid()
- # 默认空值
- redis_val = {
- "event_id": event_id,
- "nick_name": '',
- "dept_name": '',
- "phone": '',
- "duties": '',
- "sign_time": ''
- }
- # 管理中心通过授权码获取用户信息接口
- # 获取用户基本信息
- result = YzyApi.getuserbycode(code)
- errcode = int(result['errcode'])
- if errcode == 0:
- data = result['data']
- dept_list = []
- for unit in data['units']:
- unitid = unit['unitid']
- new_dept = {
- "dept": unit['unitname'],
- "position": ""
- }
- for dept in data['depts']:
- unitidpath = dept['unitidpath']
- if unitidpath.find(unitid) != -1:
- new_dept['position'] = dept['position']
- dept_list.append(new_dept)
- phone = ''
- try:
- # 敏感数据加密算法(DES 对称加密)
- phone = YzyApi.desDecryptValue(settings.YZY_CORPSECRET, data['mobile'])
- except:
- traceback.print_exc()
- dept_name = dept_list[0]['dept'] if len(dept_list) > 0 else ''
- duties = dept_list[0]['position'] if len(dept_list) > 0 else ''
- redis_val = {
- "event_id": event_id,
- "user_id": data['userid'],
- "nick_name": data['username'],
- "phone": phone,
- "dept_name": dept_name,
- "duties": duties,
- "sign_time": '',
- "dept_list": dept_list
- }
- redis_set_json(f"yzy_user_{uuid_str}", redis_val, 60)
- redirect_url = f"/yjxp/#/signPage?event_id={event_id}&uuid={uuid_str}" # 业务页面
- detail_url = f"{settings.YZY_WEB_ROOT}{redirect_url}"
- logger.info("detail_url: {}", detail_url)
- return RedirectResponse(detail_url)
- @router.get("/yzy_user_info")
- async def yzy_user_info(
- request: Request,
- uuid: str,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json)
- ):
- redis_val = redis_get_json(f"yzy_user_{uuid}")
- if redis_val is not None:
- logger.info(redis_val)
- return {
- "code": 0,
- "msg": "",
- "data": redis_val
- }
- @router.post("/check")
- async def check(
- request: Request,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json)
- ):
- time.sleep(2.0)
-
- event_id = body['event_id']
- nick_name = body['nick_name']
- dept_name = body['dept_name']
- phone = body['phone']
- duties = body['duties']
- type_ = body['type']
- dept_id = 0
- yzy_account = ''
- contact_info = db.query(EmergencyContactInfo).filter(and_(EmergencyContactInfo.del_flag == "0", EmergencyContactInfo.yue_gov_ease_phone == mpfun.enc_data(phone))).first()
- if contact_info is not None:
- yzy_account = phone
- contact_info = get_model_dict(contact_info)
- dept_id = contact_info['unit_id']
- if type_ == '1':
- # 签名
- row = db.query(EventCheckin).filter(and_(EventCheckin.event_id == event_id, EventCheckin.phone == phone)).first()
- if row is None:
- row = EventCheckin(
- event_id = event_id,
- user_id = 0,
- user_name = '',
- nick_name = nick_name,
- dept_id = dept_id,
- dept_name = dept_name,
- sign_time = datetime.now(),
- yzy_account = yzy_account,
- duties = duties,
- phone = phone,
- del_flag = '0'
- )
- db.add(row)
- db.commit()
- db.refresh(row)
- else:
- row.sign_time = datetime.now()
- row.nick_name = nick_name
- row.dept_name = dept_name
- row.duties = duties
- row.phone = phone
- row.del_flag = '0'
- db.commit()
- return {
- 'code': 200,
- 'msg': '签到成功',
- 'data': {
- 'sign_time': get_datetime_str(row.sign_time)
- }
- }
- elif type_ == '2':
- # 取消签名
- row = db.query(EventCheckin).filter(and_(EventCheckin.event_id == event_id, EventCheckin.phone == phone)).first()
- if row is None:
- return {
- 'code': 200,
- 'msg': '签退成功'
- }
-
- row.sign_time = datetime.now()
- row.del_flag = '1'
- db.commit()
- return {
- 'code': 200,
- 'msg': '签退成功'
- }
|