#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, HTTPException, Query from sqlalchemy.exc import IntegrityError from fastapi.responses import HTMLResponse, FileResponse from fastapi.responses import JSONResponse from database import get_db from sqlalchemy import text, exists, and_, or_, not_ from sqlalchemy.orm import Session from models import * import json import random from sqlalchemy import create_engine, select from typing import Optional from utils.StripTagsHTMLParser import * from common.db import db_event_management, db_user, db_area, db_emergency_plan from common.security import valid_access_token import traceback from utils import * from datetime import datetime, timedelta from common import YzyApi from common.db import db_dict from urllib.parse import quote import base64 from config import settings router = APIRouter() @router.post('/create') async def create_emergency_plan( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: dept_id = 0 dept_name = '' user_row = db.query(SysUser).filter(SysUser.user_id == user_id).first() dept_id = user_row.dept_id dept_row = db.query(SysDept).filter(SysDept.dept_id == dept_id).first() dept_name = dept_row.dept_name new_publish = InfoPublishBase( title = body['title'], publish_group = body['publish_group'], template_id = body['template_id'], content = body['content'], recorded_by = user_id, del_flag = '0', dept_id = dept_id, dept_name = dept_name, add_time = datetime.now(), response_type = body['response_type'], publish_time = body['publish_time'], examine_by = body['examine_by'], publish_status = 0, examine_status = 0, publish_channel = body['publish_channel'], user_count = body['user_count'], user_ok_count = 0, user_err_count = 0, user_sending_count = 0 ) db.add(new_publish) db.commit() db.refresh(new_publish) new_publish_id = new_publish.id # 发送人员 for u in body['users']: user_id = u['user_id'] user_row = db.query(SysUser).filter(SysUser.user_id == user_id).first() dept_id = user_row.dept_id dept_row = db.query(SysDept).filter(SysDept.dept_id == dept_id).first() dept_name = dept_row.dept_name new_resp = InfoPublishResponses( publish_id = new_publish_id, user_id = user_id, user_name = user_row.user_name, nick_name = user_row.nick_name, dept_name = dept_row.dept_name, sent_time = 0, response_type = body['response_type'] ) db.add(new_resp) db.commit() # 附件 if 'attachs' in body: for n in body['attachs']: pass # 审批附件 if 'examine_attachs' in body: for n in body['examine_attachs']: pass except Exception as e: traceback.print_exc() # 处理异常 raise HTTPException(status_code=500, detail=str(e)) @router.get('/list') async def get_publish_list( publish_group: str = Query('', description='发布单位'), publish_status: str = Query('', description='发布状态的字典键值'), examine_status: str = Query('', description='审批状态的字典键值'), sort_by: str = Query('', description='排序字段'), sort_order: str = Query("asc", description='排序方式'), page: int = Query(1, gt=0, description='页码'), page_size: int = Query(10, gt=0, description='pageSize'), db: Session = Depends(get_db) ): try: # 应用查询条件 where = and_(InfoPublishBase.del_flag == '0') if publish_status != '': where = and_(where, InfoPublishBase.publish_status == publish_status) if examine_status != '': where = and_(where, InfoPublishBase.examine_status == examine_status) if publish_group != '': where = and_(where, InfoPublishBase.publish_group.like('%{}%'.format(publish_group))) print(where) # 计算总条目数 q = db.query(func.count(InfoPublishBase.id)) q = q.filter(where) total = q.scalar() # 执行分页查询 q = db.query(InfoPublishBase) q = q.filter(where) rows = q.order_by(InfoPublishBase.id.desc()).offset((page - 1) * page_size).limit(page_size).all() data = [] for row in rows: # 发布申请人 recorded_by = row.recorded_by user_row = db.query(SysUser).filter(SysUser.user_id == recorded_by).first() nick_name = "" dept_name = "" if user_row is not None: nick_name = user_row.nick_name dept_id = user_row.dept_id dept_row = db.query(SysDept).filter(SysDept.dept_id == dept_id).first() if dept_row is not None: dept_name = dept_row.dept_name # 待处理人 examine_user = "无" examine_by = row.examine_by user_row = db.query(SysUser).filter(SysUser.user_id == examine_by).first() if user_row is not None: examine_user = user_row.nick_name data.append({ "id": row.id, "title": row.title, "publish_group": row.publish_group, "content": row.content, "publish_time": get_datetime_str(row.publish_time), "publish_channel": row.publish_channel, "nick_name": nick_name, "dept_name": dept_name, "examine_user": examine_user, "publish_status": db_dict.get_dict_label(db, "mm_publish_status", row.publish_status), "examine_status": db_dict.get_dict_label(db, "mm_examine_status", row.examine_status), "user_count": row.user_count, "user_ok_count": row.user_ok_count, "user_err_count": row.user_err_count, "user_sending_count": row.user_sending_count }) # 返回结果 return { "code": 200, "msg": "查询成功", "data": data, "total": total } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e))