from fastapi import APIRouter, Request, Depends, HTTPException, Query from sqlalchemy.exc import IntegrityError from fastapi.responses import HTMLResponse, FileResponse, StreamingResponse from fastapi.responses import JSONResponse from database import get_db from sqlalchemy import text, exists, and_, or_, not_ from sqlalchemy.orm import Session from models import * import json import random from sqlalchemy import create_engine, select from typing import Optional from utils.StripTagsHTMLParser import * from common.db import db_event_management, db_user, db_area, db_emergency_plan from common.security import valid_access_token import traceback from utils import * from datetime import datetime, timedelta import qrcode from PIL import Image from io import BytesIO from config import settings from common import YzyApi from extensions import logger router = APIRouter() @router.get('/qrcode', response_class=StreamingResponse) async def get_event_list( event_id: str, db: Session = Depends(get_db) ): redirect_url = "{}/#/signPage?event_id={}".format(settings.YJXP_WEB_ROOT_PATH, event_id) # 业务页面 # qrcode_str = f"http://19.155.220.209/api/event_management/event?event_id={event_id}" detail_url = YzyApi.format_redirect_url(redirect_url) qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_M, box_size=10, border=0, ) qr.add_data(detail_url) qr.make(fit=True) image = qr.make_image() buf = BytesIO() image.save(buf, 'png') img_data = buf.getvalue() return StreamingResponse(BytesIO(img_data), media_type="image/png") @router.post("/check") async def check( request: Request, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): row = db.query(SysUser).filter(SysUser.user_id == user_id).first() user_info = get_model_dict(row) dept_id = user_info['dept_id'] row = db.query(SysDept).filter(SysDept.dept_id == dept_id).first() dept_info = get_model_dict(row) dept_name = dept_info['dept_name'] event_id = body['event_id'] yzy_account = body['yzy_account'] row = db.query(EventCheckin).filter(and_(EventCheckin.event_id == event_id, EventCheckin.user_id == user_id)).first() if row is None: event_checkin = EventCheckin( event_id = event_id, user_id = user_id, user_name = user_info['user_name'], nick_name = user_info['nick_name'], dept_id = dept_id, dept_name = dept_name, sign_time = datetime.now(), yzy_account = yzy_account, del_flag = '0' ) db.add(event_checkin) db.commit() return { 'code': 200, 'msg': '签名成功' } else: if row.del_flag == '0': row.sign_time = datetime.now() row.del_flag = '1' db.commit() return { 'code': 200, 'msg': '签出成功' } else: row.sign_time = datetime.now() row.del_flag = '0' db.commit() return { 'code': 200, 'msg': '签名成功' } @router.get('/list') async def get_event_list( event_id: str, page: int = Query(1, gt=0, description='页码'), page_size: int = Query(10, gt=0, description='pageSize'), db: Session = Depends(get_db) ): try: # 应用查询条件 where = and_(EventCheckin.del_flag == '0', EventCheckin.event_id == event_id) # 计算总条目数 q = db.query(func.count(EventCheckin.id)) q = q.filter(where) total = q.scalar() # 执行分页查询 q = db.query(EventCheckin) q = q.filter(where) rows = q.order_by(EventCheckin.id.desc()).offset((page - 1) * page_size).limit(page_size).all() data = [ { "event_id": row.event_id, "user_id": row.user_id, "user_name": row.user_name, "nick_name": row.nick_name, "dept_id": row.dept_id, "dept_name": row.dept_name, "sign_time": get_datetime_str(row.sign_time), "yzy_account": row.yzy_account } for row in rows ] # 返回结果 return { "code": 200, "msg": "查询成功", "data": data, "total": total } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e))