#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, HTTPException, Query from fastapi.responses import HTMLResponse, FileResponse from fastapi.responses import JSONResponse from database import get_db from sqlalchemy import text, exists, and_, or_, not_ from sqlalchemy.orm import Session from models import * import json import os from sqlalchemy import create_engine, select from typing import Optional from utils.StripTagsHTMLParser import * from common.db import db_event_management, db_user, db_area, db_emergency_plan from common.security import valid_access_token import traceback from utils import * from datetime import datetime, timedelta from common import YzyApi from common.db import db_dict from urllib.parse import quote import base64 from config import settings router = APIRouter() EXAMINE_TYPE_DICT = { 0: "草稿", 10: "提交", 20: "领导审批", 30: "重新提交" } EXAMINE_SUB_TYPE_DICT = { 0: "草稿", 10: "提交", 20: "待审批", 21: "审批通过", 22: "审批不通过", 30: "重新提交" } # 信息发布创建 @router.post('/create') async def create_emergency_plan( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: dept_id = 0 dept_name = '' user_row = db.query(SysUser).filter(SysUser.user_id == user_id).first() user_name = user_row.user_name nick_name = user_row.nick_name dept_id = user_row.dept_id dept_row = db.query(SysDept).filter(SysDept.dept_id == dept_id).first() dept_name = dept_row.dept_name examine_user_row = db.query(SysUser).filter(SysUser.user_name == body['examine_user']).first() if examine_user_row is None: return { "code": 500, "msg": "审批人员账号不存在" } examine_by = examine_user_row.user_id new_publish = InfoPublishBase( title = body['title'], publish_group = body['publish_group'], template_id = body['template_id'], content = body['content'], recorded_by = user_id, del_flag = '0', dept_id = dept_id, dept_name = dept_name, add_time = datetime.now(), response_type = body['response_type'], publish_time = body['publish_time'], examine_by = examine_by, publish_status = 0, examine_status = 0, publish_channel = body['publish_channel'], user_count = body['user_count'], user_ok_count = 0, user_err_count = 0, user_sending_count = 0, info_type = body['info_type'] ) db.add(new_publish) db.commit() db.refresh(new_publish) new_publish_id = new_publish.id # 发送人员 for u in body['users']: send_user_id = u['user_id'] send_nick_name = u['nick_name'] send_user_name = '' send_dept_name = '' send_yzy_account = '' user_row = db.query(SysUser).filter(SysUser.user_id == send_user_id).first() if user_row is not None: send_user_name = user_row.user_name send_dept_row = db.query(SysDept).filter(SysDept.dept_id == user_row.dept_id).first() send_dept_name = send_dept_row.dept_name send_yzy_account = user_row.yzy_account if send_yzy_account is None or send_yzy_account == "": send_yzy_account = user_row.phonenumber new_resp = InfoPublishResponses( publish_id = new_publish_id, user_id = send_user_id, user_name = send_user_name, nick_name = send_nick_name, dept_name = send_dept_name, sent_status = 0, yzy_account = send_yzy_account, response_type = body['response_type'], publish_channel = body['publish_channel'] ) db.add(new_resp) db.commit() # 附件 if 'attachs' in body: infopublish_files = [ InfoPublishFile( file_name=fileName["name"], storage_file_name=fileName["url"], file_path=f'/data/upload/mergefile/uploads/{fileName["url"]}', file_size=os.path.getsize(f'/data/upload/mergefile/uploads/{fileName["url"]}'), foreign_key=str(new_publish_id), from_scenario="infopublish_attach_file", update_time=datetime.now(), create_time=datetime.now(), create_by=user_id, create_dept=dept_id, del_flag='0', status=0, ) for fileName in body['attachs'] ] db.add_all(infopublish_files) db.commit() # 审批附件 if 'examine_attachs' in body: infopublish_files = [ InfoPublishFile( file_name=fileName["name"], storage_file_name=fileName["url"], file_path=f'/data/upload/mergefile/uploads/{fileName["url"]}', file_size=os.path.getsize(f'/data/upload/mergefile/uploads/{fileName["url"]}'), foreign_key=str(new_publish_id), from_scenario="infopublish_examine_attach_file", update_time=datetime.now(), create_time=datetime.now(), create_by=user_id, create_dept=dept_id, del_flag='0', status=0, ) for fileName in body['examine_attachs'] ] db.add_all(infopublish_files) db.commit() # 审批记录 infopublish_examine = InfoPublishExamine( publish_id = new_publish_id, examine_type = 10, # 提交 examine_sub_type = 10, # 提交 examine_time = datetime.now(), content = '', user_id = user_id, user_name = user_name, nick_name = nick_name, del_flag = '0' ) db.add(infopublish_examine) db.commit() # 待审批状态 infopublish_examine = InfoPublishExamine( publish_id = new_publish_id, examine_type = 20, # 审批 examine_sub_type = 20, # 待审批 examine_time = datetime.now() + timedelta(seconds=1), content = '', user_id = examine_by, user_name = examine_user_row.user_name, nick_name = examine_user_row.nick_name, del_flag = '0' ) db.add(infopublish_examine) db.commit() # 改审核中、待审批状态 db.query(InfoPublishBase).filter(InfoPublishBase.id == new_publish_id).update({"publish_status": 2, "examine_status": 1}) db.commit() return { "code": 200, "msg": "信息创建成功", "data": new_publish_id } except Exception as e: traceback.print_exc() # 处理异常 raise HTTPException(status_code=500, detail=str(e)) # 信息发布分页查询 @router.get('/list') async def get_publish_list( publish_group: str = Query('', description='发布单位'), publish_status: str = Query('', description='发布状态的字典键值'), examine_status: str = Query('', description='审批状态的字典键值'), dispose_status: str = Query('', description='处理状态的字典键值'), content: str = Query('', description='信息内容'), sort_by: str = Query('', description='排序字段'), sort_order: str = Query("asc", description='排序方式'), page: int = Query(1, gt=0, description='页码'), page_size: int = Query(10, gt=0, description='pageSize'), db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): try: # 应用查询条件 where = and_(InfoPublishBase.del_flag == '0') if content != '': where = and_(where, InfoPublishBase.content.like('%{}%'.format(content))) if publish_status not in ['', '0'] : where = and_(where, InfoPublishBase.publish_status == publish_status) if examine_status not in ['', '0'] : where = and_(where, InfoPublishBase.examine_status == examine_status) if publish_group != '': where = and_(where, InfoPublishBase.publish_group.like('%{}%'.format(publish_group))) if dispose_status == '1' : # 1 待处理 # 审核类型 20 待审批 subquery = db.query(InfoPublishExamine.publish_id).filter(InfoPublishExamine.del_flag == "0").filter(InfoPublishExamine.user_id == user_id).filter(InfoPublishExamine.examine_sub_type == 20).subquery() where = and_(where, InfoPublishBase.id == subquery.c.publish_id) if dispose_status == '2' : # 2 已处理 # 审核类型 21 审批通过 22 审批不通过 subquery = db.query(InfoPublishExamine.publish_id).filter(InfoPublishExamine.del_flag == "0").filter(InfoPublishExamine.user_id == user_id).filter(InfoPublishExamine.examine_sub_type.in_([21,22])).subquery() where = and_(where, InfoPublishBase.id == subquery.c.publish_id) # 计算总条目数 q = db.query(func.count(InfoPublishBase.id)) q = q.filter(where) total = q.scalar() # 执行分页查询 q = db.query(InfoPublishBase) q = q.filter(where) rows = q.order_by(InfoPublishBase.id.desc()).offset((page - 1) * page_size).limit(page_size).all() data = [] for row in rows: # 发布申请人 recorded_by = row.recorded_by user_row = db.query(SysUser).filter(SysUser.user_id == recorded_by).first() nick_name = "" dept_name = "" if user_row is not None: nick_name = user_row.nick_name dept_id = user_row.dept_id dept_row = db.query(SysDept).filter(SysDept.dept_id == dept_id).first() if dept_row is not None: dept_name = dept_row.dept_name # 待处理人 examine_user = "无" examine_by = row.examine_by user_row = db.query(SysUser).filter(SysUser.user_id == examine_by).first() if user_row is not None: examine_user = user_row.nick_name # 是否我的审批事项(待审批) is_my_examine = 0 examine_row = db.query(InfoPublishExamine).filter(InfoPublishExamine.del_flag == "0").filter(InfoPublishExamine.user_id == user_id).filter(InfoPublishExamine.examine_sub_type == 20).order_by(InfoPublishExamine.id.desc()).limit(1).first() if examine_row is not None: is_my_examine = 1 data.append({ "id": row.id, "title": row.title, "info_type": row.info_type, "publish_group": row.publish_group, "content": row.content, "publish_time": get_datetime_str(row.publish_time), "publish_channel": row.publish_channel, "nick_name": nick_name, "dept_name": dept_name, "examine_user": examine_user, "publish_status": row.publish_status, # db_dict.get_dict_label(db, "mm_publish_status", row.publish_status), "examine_status": row.examine_status, # db_dict.get_dict_label(db, "mm_examine_status", row.examine_status), "user_count": row.user_count, "user_ok_count": row.user_ok_count, "user_err_count": row.user_err_count, "user_sending_count": row.user_sending_count, "is_my_edit": (row.examine_status == 0 or row.examine_status == 9) and row.recorded_by == user_id, # 是否我的编辑事项 "is_my_examine": is_my_examine }) # 返回结果 return { "code": 200, "msg": "查询成功", "data": data, "total": total } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) # 信息发布查看 @router.get('/edit') async def get_edit_info( request: Request, info_id: str = Query(None, description='信息ID'), db: Session = Depends(get_db)): row = db.query(InfoPublishBase).filter(InfoPublishBase.id == info_id).first() data = get_model_dict(row) examine_time = add_time = data['add_time'] data['examine_user'] = db_user.get_user_name_by_id(db, data['examine_by']) data['add_time'] = get_datetime_str(data['add_time']) data['publish_time'] = get_datetime_str(data['publish_time']) # 反馈 && 未反馈 data['feedback_count'] = db.query(InfoPublishResponses).filter(and_(InfoPublishResponses.publish_id == info_id, InfoPublishResponses.sent_status > 0, InfoPublishResponses.response_type > 0)).count() data['unresponsive_count'] = db.query(InfoPublishResponses).filter(and_(InfoPublishResponses.publish_id == info_id, InfoPublishResponses.sent_status > 0, InfoPublishResponses.response_type > 0)).count() # 附件 rows = db.query(InfoPublishFile).filter(and_(InfoPublishFile.from_scenario=="infopublish_attach_file", InfoPublishFile.foreign_key == info_id, InfoPublishFile.del_flag == '0')).all() data['attachs'] = [ { "name": row.file_name, "url": row.storage_file_name } for row in rows ] # 审批附件 rows = db.query(InfoPublishFile).filter(and_(InfoPublishFile.from_scenario=="infopublish_examine_attach_file", InfoPublishFile.foreign_key == info_id, InfoPublishFile.del_flag == '0')).all() data['examine_attachs'] = [ { "name": row.file_name, "url": row.storage_file_name } for row in rows ] data["examines"] = [] rows = db.query(InfoPublishExamine).filter(InfoPublishExamine.publish_id == info_id).filter(InfoPublishExamine.del_flag == '0').all() for row in rows: examine_time = row.examine_time data["examines"].append({ "examine_type": EXAMINE_TYPE_DICT[row.examine_type], "examine_sub_type": EXAMINE_SUB_TYPE_DICT[row.examine_sub_type], "content": row.content, "examine_time": get_datetime_str(row.examine_time), "user_id": row.user_id, "user_name": row.user_name, "nick_name": row.nick_name }) time_diff = examine_time - add_time data['process_time'] = get_process_time(time_diff) return { "code": 200, "msg": "查询成功", "data": data } # 信息发布编辑保存 @router.post('/edit') async def post_edit_info( request: Request, body = Depends(remove_xss_json), db: Session = Depends(get_db), user_id = Depends(valid_access_token)): try: id = body['id'] remove_req_param(body, 'info_id') examines = body['examines'] remove_req_param(body, 'examines') body['recorded_by'] = user_id db.query(InfoPublishBase).filter(InfoPublishBase.id == id).update(body) db.commit() return { "code": 200, "msg": "保存信息成功" } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) # 信息发布提交审核 @router.post('/examine') async def post_examine_info( request: Request, body = Depends(remove_xss_json), db: Session = Depends(get_db), user_id = Depends(valid_access_token)): user_row = db.query(SysUser).filter(SysUser.user_id == user_id).first() new_examine = InfoPublishExamine( publish_id = body['info_id'], examine_type = body['examine_type'], examine_sub_type = body['examine_sub_type'], content = body['content'], examine_time = datetime.now(), user_id = user_id, user_name = user_row.user_name, nick_name = user_row.nick_name ) db.add(new_examine) db.commit() return { "code": 200, "msg": "保存审批记录成功" } # 信息发布查看发送列表 @router.get("/sent_list") async def get_sent_list( info_id: str = Query('', description='信息ID'), channel: str = Query('', description='渠道'), keywords: str = Query('', description='关键字'), sort_by: str = Query('', description='排序字段'), sort_order: str = Query("asc", description='排序方式'), page: int = Query(1, gt=0, description='页码'), page_size: int = Query(10, gt=0, description='pageSize'), db: Session = Depends(get_db) ): try: # 应用查询条件 where = and_(InfoPublishResponses.publish_id == info_id) if channel != '': where = and_(where, InfoPublishResponses.publish_channel.like('%{}%'.format(channel))) # 计算总条目数 q = db.query(func.count(InfoPublishResponses.id)) q = q.filter(where) total = q.scalar() # 执行分页查询 q = db.query(InfoPublishResponses) q = q.filter(where) rows = q.order_by(InfoPublishResponses.id.desc()).offset((page - 1) * page_size).limit(page_size).all() data = [ { "user_id": row.user_id, "user_name": row.user_name, "nick_name": row.nick_name, "dept_name": row.dept_name, "sent_status": row.sent_status, "sent_time": get_datetime_str(row.sent_time), "response_type": get_response_type_text(row.response_type), "publish_channel": row.publish_channel, "yzy_account": row.yzy_account, "yuezhengyiFeedbackStatus": get_sent_status_text(row.sent_status), "haixinetFeedbackStatus": get_sent_status_text(0), } for row in rows ] # 返回结果 return { "code": 200, "msg": "查询成功", "data": data, "total": total } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) # 列出可用模板 @router.post("/template_list") def template_list(db: Session = Depends(get_db)): try: rows = db.query(InfoPublishTemplate).filter(InfoPublishTemplate.del_flag == '0').all() data = [ { "id": row.id, "name": row.name, "content": row.content } for row in rows ] return { "code": 200, "msg": "查询成功", "data": data, "total": len(data) } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) # 提交审批 @router.post("/submit_examine") async def submit_examine( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 删除之前的待审批记录 db.query(InfoPublishExamine).filter(and_(InfoPublishExamine.examine_type == 20, InfoPublishExamine.examine_sub_type == 20)).update({"del_flag": "2", "content": "content"}) db.commit() user_row = db.query(SysUser).filter(SysUser.user_id == user_id).first() info_id = body['info_id'] examine_type = body['examine_type'] content = body['content'] # 审批通过 if examine_type == 'approved': new_examine = InfoPublishExamine( publish_id = info_id, examine_type = 20, examine_sub_type = 21, content = content, examine_time = datetime.now(), user_id = user_id, user_name = user_row.user_name, nick_name = user_row.nick_name ) db.add(new_examine) db.commit() # publish_status 发布中 # examine_status 审批通过 db.query(InfoPublishBase).filter(InfoPublishBase.id == info_id).update({"publish_status": 3, "examine_status": 3}) db.commit() # 审批不通过 elif examine_type == 'rejected': new_examine = InfoPublishExamine( publish_id = info_id, examine_type = 20, examine_sub_type = 22, content = content, examine_time = datetime.now(), user_id = user_id, user_name = user_row.user_name, nick_name = user_row.nick_name ) db.add(new_examine) db.commit() # publish_status 取消发布 # examine_status 审批不通过 db.query(InfoPublishBase).filter(InfoPublishBase.id == info_id).update({"publish_status": 9, "examine_status": 9}) db.commit() return { "code": 200, "msg": "审批成功" } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) def get_sent_status_text(val: int) -> str: if val == 0: return "待发送" elif val == 1: return '成功' elif val == 2: return '失败' else: return "未知" def get_response_type_text(val: int) -> str: if val == 0: return "仅需阅读" elif val == 1: return '点击确认' elif val == 2: return '签字确认' else: return '未知'