from fastapi import APIRouter, Request, Depends, HTTPException, Query, Header,status from sqlalchemy.exc import IntegrityError from fastapi.responses import HTMLResponse, FileResponse, StreamingResponse, RedirectResponse from fastapi.responses import JSONResponse from starlette.requests import HTTPConnection from database import get_db from sqlalchemy import text, exists, and_, or_, not_ from sqlalchemy.orm import Session from sqlalchemy.sql import func from models import * import json import random from sqlalchemy import create_engine, select from typing import Optional from utils.StripTagsHTMLParser import * from common.db import db_event_management, db_user, db_area, db_emergency_plan from common.security import valid_access_token import traceback from utils import * from datetime import datetime, timedelta import qrcode from PIL import Image from io import BytesIO from config import settings from common import YzyApi from extensions import logger router = APIRouter() ''' @router.get('/qrcode', response_class=StreamingResponse) async def get_qrcode( request: Request, event_id: str, db: Session = Depends(get_db) ): url = str(request.url) print('url:', url) url = url.replace("qrcode", "qrcode2") + "&r="+rnd() url = url.replace("http://" + request.url.hostname, settings.YZY_WEB_ROOT) logger.info('替换到互联网ip: {}', url) qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_M, box_size=10, border=0, ) qr.add_data(url) qr.make(fit=True) image = qr.make_image() buf = BytesIO() image.save(buf, 'png') img_data = buf.getvalue() return StreamingResponse(BytesIO(img_data), media_type="image/png") @router.get('/qrcode2') async def get_qrcode2( event_id: str, user_agent: str = Header(default=''), db: Session = Depends(get_db) ): print('user_agent:', user_agent) redirect_url = "/yjxp/#/signPage?event_id={}".format(event_id) # 业务页面 # if "wxworklocal" in user_agent: # logger.info("redirect_url: {}", redirect_url) # qrcode_str = f"http://19.155.220.209/api/event_management/event?event_id={event_id}" detail_url = YzyApi.format_redirect_url(redirect_url) else: # 120.241.74.139 # detail_url = "http://yjxp.tjp.com.cn:8086/yjxp/#" + redirect_url #detail_url = "{}{}".format(settings.YZY_WEB_ROOT, redirect_url) return RedirectResponse(detail_url) ''' @router.post("/getInfo") async def getInfo( request: Request, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): event_id = body['event_id'] row = db.query(SysUser).filter(SysUser.user_id == user_id).first() user_info = get_model_dict(row) yzy_account = user_info['yzy_account'] contact_info = db.query(EmergencyContactInfo).filter(and_(EmergencyContactInfo.del_flag == "0", EmergencyContactInfo.yue_gov_ease_phone == yzy_account)).first() contact_info = get_model_dict(contact_info) nick_name = contact_info['contact_name'] dept_id = contact_info['unit_id'] dept_name = contact_info['unit_name'] duties = contact_info['position'] # row = db.query(SysDept).filter(SysDept.dept_id == user_info['dept_id']).first() # dept_info = get_model_dict(row) user_name = user_info['user_name'] # nick_name = user_info['nick_name'] # dept_id = user_info['dept_id'] # dept_name = dept_info['dept_name'] # duties = '' sign_time = '' phone = user_info['phonenumber'] row = db.query(EventCheckin).filter(and_(EventCheckin.event_id == event_id, EventCheckin.phone == phone, EventCheckin.del_flag == '0')).first() if row is not None: check_info = get_model_dict(row) user_name = check_info['user_name'] nick_name = check_info['nick_name'] dept_id = check_info['dept_id'] dept_name = check_info['dept_name'] duties = check_info['duties'] sign_time = get_datetime_str(check_info['sign_time']) phone = check_info['phone'] print('---------------', sign_time) return { 'code': 200, 'msg': '查询成功', 'data': { 'user_id': user_info['user_id'], 'user_name': user_name, 'nick_name': nick_name, 'dept_id': dept_id, 'dept_name': dept_name, 'duties': duties, 'sign_time': sign_time, 'phone': phone } } @router.post("/check") async def check( request: Request, db: Session = Depends(get_db), body = Depends(remove_xss_json), # user_id = Depends(valid_access_token) ): time.sleep(2.0) event_id = body['event_id'] nick_name = body['nick_name'] dept_name = body['dept_name'] phone = body['phone'] duties = body['duties'] type_ = body['type'] dept_id = 0 yzy_account = '' contact_info = db.query(EmergencyContactInfo).filter(and_(EmergencyContactInfo.del_flag == "0", EmergencyContactInfo.yue_gov_ease_phone == phone)).first() if contact_info is not None: yzy_account = phone contact_info = get_model_dict(contact_info) dept_id = contact_info['unit_id'] if type_ == '1': # 签名 row = db.query(EventCheckin).filter(and_(EventCheckin.event_id == event_id, EventCheckin.phone == phone)).first() if row is None: event_checkin = EventCheckin( event_id = event_id, user_id = 0, user_name = '', nick_name = nick_name, dept_id = dept_id, dept_name = dept_name, sign_time = datetime.now(), yzy_account = yzy_account, duties = duties, phone = phone, del_flag = '0' ) db.add(event_checkin) db.commit() else: row.sign_time = datetime.now() row.nick_name = nick_name row.dept_name = dept_name row.duties = duties row.phone = phone row.del_flag = '0' db.commit() return { 'code': 200, 'msg': '签到成功' } elif type_ == '2': # 取消签名 row = db.query(EventCheckin).filter(and_(EventCheckin.event_id == event_id, EventCheckin.phone == phone)).first() if row is None: return { 'code': 500, 'msg': '用户并未签名,无法签退' } row.sign_time = datetime.now() row.del_flag = '1' db.commit() return { 'code': 200, 'msg': '签退成功' } @router.get('/list') async def get_event_list( event_id: str, page: int = Query(1, gt=0, description='页码'), page_size: int = Query(10, gt=0, description='pageSize'), db: Session = Depends(get_db) ): try: # 应用查询条件 where = and_(EventCheckin.del_flag == '0', EventCheckin.event_id == event_id) # 计算总条目数 q = db.query(func.count(EventCheckin.id)) q = q.filter(where) total = q.scalar() # 执行分页查询 q = db.query(EventCheckin) q = q.filter(where) rows = q.order_by(EventCheckin.id.desc()).offset((page - 1) * page_size).limit(page_size).all() data = [ { "event_id": row.event_id, "user_id": row.user_id, "user_name": row.user_name, "nick_name": row.nick_name, "dept_id": row.dept_id, "dept_name": row.dept_name, "sign_time": get_datetime_str(row.sign_time), "yzy_account": row.yzy_account, "duties": row.duties, "phone": row.phone } for row in rows ] # 返回结果 return { "code": 200, "msg": "查询成功", "data": data, "total": total } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e))