from fastapi import APIRouter, Request, Depends, HTTPException, Query from sqlalchemy.exc import IntegrityError from fastapi.responses import HTMLResponse, FileResponse, StreamingResponse, RedirectResponse from fastapi.responses import JSONResponse from database import get_db from sqlalchemy import text, exists, and_, or_, not_ from sqlalchemy.orm import Session from sqlalchemy.sql import func from models import * import json import random from sqlalchemy import create_engine, select from typing import Optional from utils.StripTagsHTMLParser import * from common.db import db_event_management, db_user, db_area, db_emergency_plan from common.security import valid_access_token import traceback from utils import * from datetime import datetime, timedelta import qrcode from PIL import Image from io import BytesIO from config import settings from common import YzyApi from extensions import logger router = APIRouter() @router.get('/qrcode', response_class=StreamingResponse) async def get_qrcode( request: Request, event_id: str, db: Session = Depends(get_db) ): url = str(request.url) url = url.replace("qrcode", "qrcode2") + "&r="+rnd() # if settings.IS_STAGE: # url = url.replace(request.url.hostname, "120.241.74.139:8086") # logger.info('替换到互联网ip: {}', url) qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_M, box_size=10, border=0, ) qr.add_data(url) qr.make(fit=True) image = qr.make_image() buf = BytesIO() image.save(buf, 'png') img_data = buf.getvalue() return StreamingResponse(BytesIO(img_data), media_type="image/png") @router.get('/qrcode2') async def get_qrcode2( event_id: str, db: Session = Depends(get_db) ): redirect_url = "/signPage?event_id={}".format(event_id) # 业务页面 logger.info("redirect_url: {}", redirect_url) # qrcode_str = f"http://19.155.220.209/api/event_management/event?event_id={event_id}" detail_url = YzyApi.format_redirect_url(redirect_url) return RedirectResponse(detail_url) @router.post("/getInfo") async def getInfo( request: Request, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): event_id = body['event_id'] row = db.query(SysUser).filter(SysUser.user_id == user_id).first() user_info = get_model_dict(row) yzy_account = user_info['yzy_account'] contact_info = db.query(EmergencyContactInfo).filter(and_(EmergencyContactInfo.del_flag == "0", EmergencyContactInfo.yue_gov_ease_phone == yzy_account)).first() contact_info = get_model_dict(contact_info) nick_name = contact_info['contact_name'] dept_id = contact_info['unit_id'] dept_name = contact_info['unit_name'] duties = contact_info['position'] # row = db.query(SysDept).filter(SysDept.dept_id == user_info['dept_id']).first() # dept_info = get_model_dict(row) user_name = user_info['user_name'] # nick_name = user_info['nick_name'] # dept_id = user_info['dept_id'] # dept_name = dept_info['dept_name'] # duties = '' sign_time = '' phone = user_info['phonenumber'] row = db.query(EventCheckin).filter(and_(EventCheckin.event_id == event_id, EventCheckin.phone == phone, EventCheckin.del_flag == '0')).first() if row is not None: check_info = get_model_dict(row) user_name = check_info['user_name'] nick_name = check_info['nick_name'] dept_id = check_info['dept_id'] dept_name = check_info['dept_name'] duties = check_info['duties'] sign_time = get_datetime_str(check_info['sign_time']) phone = check_info['phone'] print('---------------', sign_time) return { 'code': 200, 'msg': '查询成功', 'data': { 'user_id': user_info['user_id'], 'user_name': user_name, 'nick_name': nick_name, 'dept_id': dept_id, 'dept_name': dept_name, 'duties': duties, 'sign_time': sign_time, 'phone': phone } } @router.post("/check") async def check( request: Request, db: Session = Depends(get_db), body = Depends(remove_xss_json), # user_id = Depends(valid_access_token) ): time.sleep(2.0) event_id = body['event_id'] nick_name = body['nick_name'] dept_name = body['dept_name'] phone = body['phone'] duties = body['duties'] type_ = body['type'] dept_id = 0 yzy_account = '' contact_info = db.query(EmergencyContactInfo).filter(and_(EmergencyContactInfo.del_flag == "0", EmergencyContactInfo.yue_gov_ease_phone == phone)).first() if contact_info is not None: yzy_account = phone contact_info = get_model_dict(contact_info) dept_id = contact_info['unit_id'] if type_ == '1': # 签名 row = db.query(EventCheckin).filter(and_(EventCheckin.event_id == event_id, EventCheckin.phone == phone)).first() if row is None: event_checkin = EventCheckin( event_id = event_id, user_id = 0, user_name = '', nick_name = nick_name, dept_id = dept_id, dept_name = dept_name, sign_time = datetime.now(), yzy_account = yzy_account, duties = duties, phone = phone, del_flag = '0' ) db.add(event_checkin) db.commit() else: row.sign_time = datetime.now() row.nick_name = nick_name row.dept_name = dept_name row.duties = duties row.phone = phone row.del_flag = '0' db.commit() return { 'code': 200, 'msg': '签到成功' } elif type_ == '2': # 取消签名 row = db.query(EventCheckin).filter(and_(EventCheckin.event_id == event_id, EventCheckin.phone == phone)).first() if row is None: return { 'code': 500, 'msg': '用户并未签名,无法签退' } row.sign_time = datetime.now() row.del_flag = '1' db.commit() return { 'code': 200, 'msg': '签退成功' } @router.get('/list') async def get_event_list( event_id: str, page: int = Query(1, gt=0, description='页码'), page_size: int = Query(10, gt=0, description='pageSize'), db: Session = Depends(get_db) ): try: # 应用查询条件 where = and_(EventCheckin.del_flag == '0', EventCheckin.event_id == event_id) # 计算总条目数 q = db.query(func.count(EventCheckin.id)) q = q.filter(where) total = q.scalar() # 执行分页查询 q = db.query(EventCheckin) q = q.filter(where) rows = q.order_by(EventCheckin.id.desc()).offset((page - 1) * page_size).limit(page_size).all() data = [ { "event_id": row.event_id, "user_id": row.user_id, "user_name": row.user_name, "nick_name": row.nick_name, "dept_id": row.dept_id, "dept_name": row.dept_name, "sign_time": get_datetime_str(row.sign_time), "yzy_account": row.yzy_account, "duties": row.duties, "phone": row.phone } for row in rows ] # 返回结果 return { "code": 200, "msg": "查询成功", "data": data, "total": total } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e))