from fastapi import APIRouter, Request, Depends, HTTPException, Query from sqlalchemy.exc import IntegrityError from fastapi.responses import HTMLResponse, FileResponse, StreamingResponse from fastapi.responses import JSONResponse from database import get_db from sqlalchemy import text, exists, and_, or_, not_ from sqlalchemy.orm import Session from models import * import json import random from sqlalchemy import create_engine, select from typing import Optional from utils.StripTagsHTMLParser import * from common.db import db_event_management, db_user, db_area, db_emergency_plan from common.security import valid_access_token import traceback from utils import * from datetime import datetime, timedelta import qrcode from PIL import Image from io import BytesIO from config import settings from common import YzyApi from extensions import logger router = APIRouter() @router.get('/qrcode', response_class=StreamingResponse) async def get_event_list( event_id: str, db: Session = Depends(get_db) ): redirect_url = "{}/#/signPage?event_id={}".format(settings.YJXP_WEB_ROOT_PATH, event_id) # 业务页面 # qrcode_str = f"http://19.155.220.209/api/event_management/event?event_id={event_id}" detail_url = YzyApi.format_redirect_url(redirect_url) qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_M, box_size=10, border=0, ) qr.add_data(detail_url) qr.make(fit=True) image = qr.make_image() buf = BytesIO() image.save(buf, 'png') img_data = buf.getvalue() return StreamingResponse(BytesIO(img_data), media_type="image/png") @router.post("/getInfo") async def getInfo( request: Request, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): event_id = body['event_id'] row = db.query(SysUser).filter(SysUser.user_id == user_id).first() user_info = get_model_dict(row) row = db.query(SysDept).filter(SysDept.dept_id == user_info['dept_id']).first() dept_info = get_model_dict(row) user_name = user_info['user_name'] nick_name = user_info['nick_name'] dept_id = user_info['dept_id'] dept_name = dept_info['dept_name'] duties = '' sign_time = '' phone = user_info['phonenumber'] row = db.query(EventCheckin).filter(and_(EventCheckin.event_id == event_id, EventCheckin.user_id == user_id, EventCheckin.del_flag == '0')).first() if row is not None: check_info = get_model_dict(row) user_name = check_info['user_name'] nick_name = check_info['nick_name'] dept_id = check_info['dept_id'] dept_name = check_info['dept_name'] duties = check_info['duties'] sign_time = get_datetime_str(check_info['sign_time']) phone = check_info['phone'] print('---------------', sign_time) return { 'code': 200, 'msg': '查询成功', 'data': { 'user_id': user_info['user_id'], 'user_name': user_name, 'nick_name': nick_name, 'dept_id': dept_id, 'dept_name': dept_name, 'duties': duties, 'sign_time': sign_time, 'phone': phone } } @router.post("/check") async def check( request: Request, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): time.sleep(3) row = db.query(SysUser).filter(SysUser.user_id == user_id).first() user_info = get_model_dict(row) dept_id = user_info['dept_id'] yzy_account = user_info['yzy_account'] row = db.query(SysDept).filter(SysDept.dept_id == dept_id).first() dept_info = get_model_dict(row) dept_name = dept_info['dept_name'] event_id = body['event_id'] phone = body['phone'] duties = body['duties'] row = db.query(EventCheckin).filter(and_(EventCheckin.event_id == event_id, EventCheckin.user_id == user_id)).first() if row is None: event_checkin = EventCheckin( event_id = event_id, user_id = user_id, user_name = user_info['user_name'], nick_name = user_info['nick_name'], dept_id = dept_id, dept_name = dept_name, sign_time = datetime.now(), yzy_account = yzy_account, duties = duties, phone = phone, del_flag = '0' ) db.add(event_checkin) db.commit() return { 'code': 200, 'msg': '签到成功' } else: if row.del_flag == '0': row.sign_time = datetime.now() row.duties = duties, row.phone = phone, row.del_flag = '1' db.commit() return { 'code': 200, 'msg': '签退成功' } else: row.sign_time = datetime.now() row.duties = duties, row.phone = phone, row.del_flag = '0' db.commit() return { 'code': 200, 'msg': '签到成功' } @router.get('/list') async def get_event_list( event_id: str, page: int = Query(1, gt=0, description='页码'), page_size: int = Query(10, gt=0, description='pageSize'), db: Session = Depends(get_db) ): try: # 应用查询条件 where = and_(EventCheckin.del_flag == '0', EventCheckin.event_id == event_id) # 计算总条目数 q = db.query(func.count(EventCheckin.id)) q = q.filter(where) total = q.scalar() # 执行分页查询 q = db.query(EventCheckin) q = q.filter(where) rows = q.order_by(EventCheckin.id.desc()).offset((page - 1) * page_size).limit(page_size).all() data = [ { "event_id": row.event_id, "user_id": row.user_id, "user_name": row.user_name, "nick_name": row.nick_name, "dept_id": row.dept_id, "dept_name": row.dept_name, "sign_time": get_datetime_str(row.sign_time), "yzy_account": row.yzy_account, "duties": row.duties } for row in rows ] # 返回结果 return { "code": 200, "msg": "查询成功", "data": data, "total": total } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e))