123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- from fastapi import APIRouter, Request, Depends, HTTPException, Query
- from sqlalchemy.exc import IntegrityError
- from fastapi.responses import HTMLResponse, FileResponse, StreamingResponse, RedirectResponse
- from fastapi.responses import JSONResponse
- from database import get_db
- from sqlalchemy import text, exists, and_, or_, not_
- from sqlalchemy.orm import Session
- from sqlalchemy.sql import func
- from models import *
- import json
- import random
- from sqlalchemy import create_engine, select
- from typing import Optional
- from utils.StripTagsHTMLParser import *
- from common.db import db_event_management, db_user, db_area, db_emergency_plan
- from common.security import valid_access_token
- import traceback
- from utils import *
- from datetime import datetime, timedelta
- import qrcode
- from PIL import Image
- from io import BytesIO
- from config import settings
- from common import YzyApi
- from extensions import logger
- router = APIRouter()
- @router.get('/qrcode', response_class=StreamingResponse)
- async def get_qrcode(
- request: Request,
- event_id: str,
- db: Session = Depends(get_db)
- ):
- url = str(request.url)
- url = url.replace("qrcode", "qrcode2") + "&r="+rnd()
- # if settings.IS_STAGE:
- # url = url.replace(request.url.hostname, "120.241.74.139:8086")
- # logger.info('替换到互联网ip: {}', url)
- qr = qrcode.QRCode(
- version=1,
- error_correction=qrcode.constants.ERROR_CORRECT_M,
- box_size=10,
- border=0,
- )
- qr.add_data(url)
- qr.make(fit=True)
- image = qr.make_image()
- buf = BytesIO()
- image.save(buf, 'png')
- img_data = buf.getvalue()
- return StreamingResponse(BytesIO(img_data), media_type="image/png")
- @router.get('/qrcode2')
- async def get_qrcode2(
- event_id: str,
- db: Session = Depends(get_db)
- ):
- redirect_url = "/signPage?event_id={}".format(event_id) # 业务页面
- logger.info("redirect_url: {}", redirect_url)
- # qrcode_str = f"http://19.155.220.209/api/event_management/event?event_id={event_id}"
- detail_url = YzyApi.format_redirect_url(redirect_url)
- return RedirectResponse(detail_url)
- @router.post("/getInfo")
- async def getInfo(
- request: Request,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- event_id = body['event_id']
- row = db.query(SysUser).filter(SysUser.user_id == user_id).first()
- user_info = get_model_dict(row)
- yzy_account = user_info['yzy_account']
- contact_info = db.query(EmergencyContactInfo).filter(and_(EmergencyContactInfo.del_flag == "0", EmergencyContactInfo.yue_gov_ease_phone == yzy_account)).first()
- contact_info = get_model_dict(contact_info)
- nick_name = contact_info['contact_name']
- dept_id = contact_info['unit_id']
- dept_name = contact_info['unit_name']
- duties = contact_info['position']
- # row = db.query(SysDept).filter(SysDept.dept_id == user_info['dept_id']).first()
- # dept_info = get_model_dict(row)
- user_name = user_info['user_name']
- # nick_name = user_info['nick_name']
- # dept_id = user_info['dept_id']
- # dept_name = dept_info['dept_name']
- # duties = ''
- sign_time = ''
- phone = user_info['phonenumber']
- row = db.query(EventCheckin).filter(and_(EventCheckin.event_id == event_id, EventCheckin.phone == phone, EventCheckin.del_flag == '0')).first()
- if row is not None:
- check_info = get_model_dict(row)
- user_name = check_info['user_name']
- nick_name = check_info['nick_name']
- dept_id = check_info['dept_id']
- dept_name = check_info['dept_name']
- duties = check_info['duties']
- sign_time = get_datetime_str(check_info['sign_time'])
- phone = check_info['phone']
- print('---------------', sign_time)
- return {
- 'code': 200,
- 'msg': '查询成功',
- 'data': {
- 'user_id': user_info['user_id'],
- 'user_name': user_name,
- 'nick_name': nick_name,
- 'dept_id': dept_id,
- 'dept_name': dept_name,
- 'duties': duties,
- 'sign_time': sign_time,
- 'phone': phone
- }
- }
- @router.post("/check")
- async def check(
- request: Request,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- # user_id = Depends(valid_access_token)
- ):
- time.sleep(2.0)
-
- event_id = body['event_id']
- nick_name = body['nick_name']
- dept_name = body['dept_name']
- phone = body['phone']
- duties = body['duties']
- type_ = body['type']
- dept_id = 0
- yzy_account = ''
- contact_info = db.query(EmergencyContactInfo).filter(and_(EmergencyContactInfo.del_flag == "0", EmergencyContactInfo.yue_gov_ease_phone == phone)).first()
- if contact_info is not None:
- yzy_account = phone
- contact_info = get_model_dict(contact_info)
- dept_id = contact_info['unit_id']
- if type_ == '1':
- # 签名
- row = db.query(EventCheckin).filter(and_(EventCheckin.event_id == event_id, EventCheckin.phone == phone)).first()
- if row is None:
- event_checkin = EventCheckin(
- event_id = event_id,
- user_id = 0,
- user_name = '',
- nick_name = nick_name,
- dept_id = dept_id,
- dept_name = dept_name,
- sign_time = datetime.now(),
- yzy_account = yzy_account,
- duties = duties,
- phone = phone,
- del_flag = '0'
- )
- db.add(event_checkin)
- db.commit()
- else:
- row.sign_time = datetime.now()
- row.nick_name = nick_name
- row.dept_name = dept_name
- row.duties = duties
- row.phone = phone
- row.del_flag = '0'
- db.commit()
- return {
- 'code': 200,
- 'msg': '签到成功'
- }
- elif type_ == '2':
- # 取消签名
- row = db.query(EventCheckin).filter(and_(EventCheckin.event_id == event_id, EventCheckin.phone == phone)).first()
- if row is None:
- return {
- 'code': 500,
- 'msg': '用户并未签名,无法签退'
- }
-
- row.sign_time = datetime.now()
- row.del_flag = '1'
- db.commit()
- return {
- 'code': 200,
- 'msg': '签退成功'
- }
-
- @router.get('/list')
- async def get_event_list(
- event_id: str,
- page: int = Query(1, gt=0, description='页码'),
- page_size: int = Query(10, gt=0, description='pageSize'),
- db: Session = Depends(get_db)
- ):
- try:
- # 应用查询条件
- where = and_(EventCheckin.del_flag == '0', EventCheckin.event_id == event_id)
- # 计算总条目数
- q = db.query(func.count(EventCheckin.id))
- q = q.filter(where)
- total = q.scalar()
-
- # 执行分页查询
- q = db.query(EventCheckin)
- q = q.filter(where)
- rows = q.order_by(EventCheckin.id.desc()).offset((page - 1) * page_size).limit(page_size).all()
- data = [
- {
- "event_id": row.event_id,
- "user_id": row.user_id,
- "user_name": row.user_name,
- "nick_name": row.nick_name,
- "dept_id": row.dept_id,
- "dept_name": row.dept_name,
- "sign_time": get_datetime_str(row.sign_time),
- "yzy_account": row.yzy_account,
- "duties": row.duties,
- "phone": row.phone
- }
- for row in rows
- ]
- # 返回结果
- return {
- "code": 200,
- "msg": "查询成功",
- "data": data,
- "total": total
- }
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=str(e))
|