#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, HTTPException, Query from sqlalchemy.exc import IntegrityError from fastapi.responses import HTMLResponse, FileResponse from fastapi.responses import JSONResponse from database import get_db from sqlalchemy import text, exists, and_, or_, not_ from sqlalchemy.orm import Session from models import * import json import os from sqlalchemy import create_engine, select from typing import Optional from utils.StripTagsHTMLParser import * from common.db import db_event_management, db_user, db_area, db_emergency_plan from common.security import valid_access_token import traceback from utils import * from datetime import datetime, timedelta from common import YzyApi from common.db import db_dict from urllib.parse import quote import base64 from io import BytesIO from PIL import Image from config import settings router = APIRouter() EXAMINE_TYPE_DICT = { 0: "草稿", 10: "提交", 20: "领导审批", 30: "重新提交", 40: "转交" } EXAMINE_SUB_TYPE_DICT = { 0: "草稿", 10: "提交", 20: "待审批", 21: "审批通过", 22: "审批不通过", 30: "重新提交", 40: "转交" } # 信息发布分页(我能看的内容,小屏) # 因为信息发布被消息中心取代,因为这个接口可能没那么大用了 2024/12/26 @router.get('/list') async def get_publish_list( search_keyword: str = Query('', description='信息内容'), info_type: str = Query('', description='类型'), page: int = Query(1, gt=0, description='页码'), begin_time_s: str = Query(None, description='开始时间'), end_time_s: str = Query(None, description='结束时间'), info_order: str = Query("desc", description='时间排序'), page_size: int = Query(10, gt=0, description='pageSize'), db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): try: # 应用查询条件 where = and_(InfoPublishBase.del_flag == '0', InfoPublishBase.publish_status == 4) if search_keyword != '': where = and_(where, InfoPublishBase.content.like('%{}%'.format(search_keyword))) if info_type != '': where = and_(where, InfoPublishBase.info_type == info_type) if begin_time_s != None: begin_time = datetime.strptime(begin_time_s, "%Y-%m-%d") where = and_(where, InfoPublishBase.publish_time >= begin_time) if end_time_s != None: end_time = datetime.strptime(end_time_s, "%Y-%m-%d") + timedelta(days=1) where = and_(where, InfoPublishBase.publish_time < end_time) print(where) subquery = db.query(InfoPublishResponses.publish_id).filter(InfoPublishResponses.user_id == user_id).subquery() # 计算总条目数 q = db.query(func.count(InfoPublishBase.id)) q = q.filter(where).filter(InfoPublishBase.id == subquery.c.publish_id) total = q.scalar() # 执行分页查询 q = db.query(InfoPublishBase) q = q.filter(where).filter(InfoPublishBase.id == subquery.c.publish_id) if info_order == 'desc': q.order_by(InfoPublishBase.publish_time.desc()) if info_order == 'asc': q.order_by(InfoPublishBase.publish_time.asc()) rows = q.offset((page - 1) * page_size).limit(page_size).all() data = [] for row in rows: # 发布申请人 recorded_by = row.recorded_by user_row = db.query(SysUser).filter(SysUser.user_id == recorded_by).first() nick_name = "" dept_name = "" if user_row is not None: nick_name = user_row.nick_name dept_id = user_row.dept_id dept_row = db.query(SysDept).filter(SysDept.dept_id == dept_id).first() if dept_row is not None: dept_name = dept_row.dept_name # 待处理人 examine_user = "无" examine_by = row.examine_by user_row = db.query(SysUser).filter(SysUser.user_id == examine_by).first() if user_row is not None: examine_user = user_row.nick_name data.append({ "id": row.id, "title": row.title, "info_type": row.info_type, "publish_group": row.publish_group, "content": row.content, "publish_time": get_datetime_str(row.publish_time), "add_time": row.add_time.strftime("%Y-%m-%d %H:%M"), "publish_channel": row.publish_channel, "nick_name": nick_name, "dept_name": dept_name, "examine_user": examine_user, "publish_status": db_dict.get_dict_label(db, "mm_publish_status", row.publish_status), "examine_status": db_dict.get_dict_label(db, "mm_examine_status", row.examine_status), "user_count": row.user_count, "user_ok_count": row.user_ok_count, "user_err_count": row.user_err_count, "user_sending_count": row.user_sending_count, "is_my_edit": (row.examine_status == 0 or row.examine_status == 9) and row.recorded_by == user_id, # 是否我的编辑事项 "is_my_examine": row.examine_status == 1 and int(row.examine_by) == user_id # 是否我的审批事项 }) # 返回结果 return { "code": 200, "msg": "查询成功", "data": data, "total": total } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.get('/detail') async def get_info_detail( request: Request, id: str = Query(None, description='信息编号'), db: Session = Depends(get_db), user_id = Depends(valid_access_token)): try: # 构建查询 query = db.query(InfoPublishBase) query = query.filter(InfoPublishBase.id == id) # 执行查询 row = query.first() if row is not None: # 发布申请人 recorded_by = row.recorded_by user_row = db.query(SysUser).filter(SysUser.user_id == recorded_by).first() nick_name = "" dept_name = "" if user_row is not None: nick_name = user_row.nick_name dept_id = user_row.dept_id dept_row = db.query(SysDept).filter(SysDept.dept_id == dept_id).first() if dept_row is not None: dept_name = dept_row.dept_name response_row = db.query(InfoPublishResponses).filter(and_(InfoPublishResponses.publish_id == id, InfoPublishResponses.user_id == user_id)).first() # 是否我的审批事项(待审批) is_my_examine = 0 examine_row = db.query(InfoPublishExamine).filter(InfoPublishExamine.del_flag == "0").filter(InfoPublishExamine.user_id == user_id).filter(InfoPublishExamine.examine_sub_type == 20).order_by(InfoPublishExamine.id.desc()).first() if examine_row is not None: is_my_examine = 1 data = { "id": row.id, "title": row.title, "info_type": row.info_type, "add_time": row.add_time.strftime("%Y-%m-%d %H:%M"), "nick_name": nick_name, "dept_name": dept_name, "publish_group": row.publish_group, "content": row.content, "publish_time": get_datetime_str(row.publish_time), "publish_channel": row.publish_channel, "response_type": response_row.response_type, "response_status": response_row.response_status, "response_time": get_datetime_str(response_row.response_time), "signature": get_signature_base64(db, response_row.id), "is_my_examine": is_my_examine } # 附件 rows = db.query(InfoPublishFile).filter(and_(InfoPublishFile.from_scenario=="infopublish_attach_file", InfoPublishFile.foreign_key == id, InfoPublishFile.del_flag == '0')).all() data['files'] = [ { "name": row.file_name, "url": row.storage_file_name } for row in rows ] # 审批进度 data["examines"] = [] rows = db.query(InfoPublishExamine).filter(InfoPublishExamine.publish_id == id).filter(InfoPublishExamine.del_flag == '0').all() for row in rows: data["examines"].append({ "examine_type": EXAMINE_TYPE_DICT[row.examine_type], "examine_sub_type": EXAMINE_SUB_TYPE_DICT[row.examine_sub_type], "content": row.content, "examine_time": get_datetime_str(row.examine_time), "user_id": row.user_id, "user_name": row.user_name, "nick_name": row.nick_name }) return { "code": 200, "msg": "查询成功", "data": data } else: return { "code": 500, "msg": "查询失败" } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.post("/confirmReceived") async def confirmReceived( request: Request, body = Depends(remove_xss_json), db: Session = Depends(get_db), user_id = Depends(valid_access_token)): try: id = get_req_param(body, "id") row = db.query(InfoPublishResponses).filter(and_(InfoPublishResponses.publish_id == id, InfoPublishResponses.user_id == user_id)).first() if row is None: return { "code": 500, "msg": "查询失败" } row.response_time = datetime.now() row.response_status = 1 # 已确认 db.commit() return { "code": 200, "msg": "反馈成功", "data": { "response_status": row.response_status } } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) # 确定签名 @router.post("/confirmSignature") async def confirmSignature( request: Request, body = Depends(remove_xss_json), db: Session = Depends(get_db), user_id = Depends(valid_access_token)): try: id = get_req_param(body, "id") row = db.query(InfoPublishResponses).filter(and_(InfoPublishResponses.publish_id == id, InfoPublishResponses.user_id == user_id)).first() if row is None: return { "code": 500, "msg": "查询失败" } response_id = row.id file_name = new_guid() + ".png" file_path = f'/data/upload/mergefile/uploads/{file_name}' base64_data = get_req_param(body, "image") base64_data = base64_data.replace("data:image/png;base64,", "") binary_data = base64.b64decode(base64_data) bytes_io = BytesIO(binary_data) image = Image.open(bytes_io) image.save(file_path) new_file = InfoPublishFile( file_name="信息阅读确认签名.png", storage_file_name=file_name, file_path=f'/data/upload/mergefile/uploads/{file_name}', file_size=os.path.getsize(f'/data/upload/mergefile/uploads/{file_name}'), foreign_key=str(response_id), from_scenario="infopublish_signature_file", update_time=datetime.now(), create_time=datetime.now(), create_by=user_id, create_dept=0, del_flag='0', status=0, ) db.add(new_file) row.response_time = datetime.now() row.response_status = 2 # 已签名 db.commit() return { "code": 200, "msg": "反馈成功", "data": { "response_status": row.response_status } } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) def image2base64(img_file: str): f = open(img_file, 'rb') img_raw_data = f.read() f.close() img_b64_string = base64.b64encode(img_raw_data) return img_b64_string.decode('ascii') def get_signature_base64(db: Session, response_id: int): row = db.query(InfoPublishFile).filter(and_(InfoPublishFile.from_scenario == 'infopublish_signature_file', InfoPublishFile.foreign_key == str(response_id))).first() if row is None: return "" return "data:image/png;base64," + image2base64(row.file_path) # 我的工作审批 @router.get("/work_approval/list") async def work_approval_list( search_keyword: str = Query('', description='信息内容'), status: str = Query('1', description='状态'), info_type: str = Query(None, description='信息类型'), time_type: str = Query(None, description='时间类型'), info_order: str = Query("desc", description='时间排序'), page: int = Query(1, gt=0, description='页码'), page_size: int = Query(10, gt=0, description='pageSize'), db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): try: # 应用查询条件 where = and_(InfoPublishBase.del_flag == '0') # 待办 if status == "1": subquery = db.query(InfoPublishExamine.publish_id).filter(InfoPublishExamine.del_flag == "0").filter(InfoPublishExamine.user_id == user_id).filter(InfoPublishExamine.examine_sub_type == 20).subquery() where = and_(where, InfoPublishBase.id == subquery.c.publish_id) # 已完成 if status == "2": subquery = db.query(InfoPublishExamine.publish_id).filter(InfoPublishExamine.del_flag == "0").filter(InfoPublishExamine.user_id == user_id).filter(InfoPublishExamine.examine_sub_type.in_([21,22])).subquery() where = and_(where, InfoPublishBase.id == subquery.c.publish_id) if search_keyword != '': where = and_(where, InfoPublishBase.content.like('%{}%'.format(search_keyword))) if info_type != None: where = and_(where, InfoPublishBase.info_type == info_type) ''' if end_time_s != None: end_time = datetime.strptime(end_time_s, "%Y-%m-%d") + timedelta(days=1) where = and_(where, InfoPublishBase.publish_time < end_time) print(where) ''' # subquery = db.query(InfoPublishResponses.publish_id).filter(InfoPublishResponses.user_id == user_id).subquery() # 计算总条目数 q = db.query(func.count(InfoPublishBase.id)).filter(where) # q = q.filter(where).filter(InfoPublishBase.id == subquery.c.publish_id) total = q.scalar() # 执行分页查询 q = db.query(InfoPublishBase).filter(where) # q = q.filter(where).filter(InfoPublishBase.id == subquery.c.publish_id) if info_order == 'desc': q.order_by(InfoPublishBase.add_time.desc()) if info_order == 'asc': q.order_by(InfoPublishBase.add_time.asc()) rows = q.offset((page - 1) * page_size).limit(page_size).all() data = [] for row in rows: # 发布申请人 recorded_by = row.recorded_by user_row = db.query(SysUser).filter(SysUser.user_id == recorded_by).first() nick_name = "" dept_name = "" if user_row is not None: nick_name = user_row.nick_name dept_id = user_row.dept_id dept_row = db.query(SysDept).filter(SysDept.dept_id == dept_id).first() if dept_row is not None: dept_name = dept_row.dept_name # 待处理人 examine_user = "无" examine_by = row.examine_by user_row = db.query(SysUser).filter(SysUser.user_id == examine_by).first() if user_row is not None: examine_user = user_row.nick_name # 是否我的审批事项(待审批) is_my_examine = 0 examine_row = db.query(InfoPublishExamine).filter(InfoPublishExamine.del_flag == "0").filter(InfoPublishExamine.user_id == user_id).filter(InfoPublishExamine.examine_sub_type == 20).order_by(InfoPublishExamine.id.desc()).limit(1).first() if examine_row is not None: is_my_examine = 1 data.append({ "id": row.id, "title": row.title, "info_type": row.info_type, "publish_group": row.publish_group, "content": row.content, "publish_time": get_datetime_str(row.publish_time), "add_time": row.add_time.strftime("%Y-%m-%d %H:%M"), "publish_channel": row.publish_channel, "nick_name": nick_name, "dept_name": dept_name, "examine_user": examine_user, "publish_status": db_dict.get_dict_label(db, "mm_publish_status", row.publish_status), "examine_status": db_dict.get_dict_label(db, "mm_examine_status", row.examine_status), "user_count": row.user_count, "user_ok_count": row.user_ok_count, "user_err_count": row.user_err_count, "user_sending_count": row.user_sending_count, "is_my_edit": (row.examine_status == 0 or row.examine_status == 9) and row.recorded_by == user_id, # 是否我的编辑事项 "is_my_examine": is_my_examine }) # 返回结果 return { "code": 200, "msg": "查询成功", "data": data, "total": total } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.get('/work_approval/detail') async def get_info_detail( request: Request, id: str = Query(None, description='信息编号'), db: Session = Depends(get_db), user_id = Depends(valid_access_token)): try: # 构建查询 query = db.query(InfoPublishBase) query = query.filter(InfoPublishBase.id == id) # 执行查询 row = query.first() if row is not None: # 发布申请人 recorded_by = row.recorded_by user_row = db.query(SysUser).filter(SysUser.user_id == recorded_by).first() nick_name = "" dept_name = "" if user_row is not None: nick_name = user_row.nick_name dept_id = user_row.dept_id dept_row = db.query(SysDept).filter(SysDept.dept_id == dept_id).first() if dept_row is not None: dept_name = dept_row.dept_name # 是否我的审批事项(待审批) is_my_examine = 0 examine_row = db.query(InfoPublishExamine).filter(InfoPublishExamine.del_flag == "0").filter(InfoPublishExamine.user_id == user_id).filter(InfoPublishExamine.examine_sub_type == 20).order_by(InfoPublishExamine.id.desc()).first() if examine_row is not None: is_my_examine = 1 # 如果发布了就不能修改 if row.publish_status == 4: is_my_examine = 0 data = { "id": row.id, "title": row.title, "info_type": row.info_type, "add_time": row.add_time.strftime("%Y-%m-%d %H:%M"), "nick_name": nick_name, "dept_name": dept_name, "publish_group": row.publish_group, "content": row.content, "publish_time": get_datetime_str(row.publish_time), "publish_channel": row.publish_channel, "is_my_examine": is_my_examine } # 附件 rows = db.query(InfoPublishFile).filter(and_(InfoPublishFile.from_scenario=="infopublish_attach_file", InfoPublishFile.foreign_key == id, InfoPublishFile.del_flag == '0')).all() data['files'] = [ { "name": row.file_name, "url": row.storage_file_name } for row in rows ] # 审批进度 data["examines"] = [] rows = db.query(InfoPublishExamine).filter(InfoPublishExamine.publish_id == id).filter(InfoPublishExamine.del_flag == '0').all() for row in rows: data["examines"].append({ "examine_type": EXAMINE_TYPE_DICT[row.examine_type], "examine_sub_type": EXAMINE_SUB_TYPE_DICT[row.examine_sub_type], "content": row.content, "examine_time": get_datetime_str(row.examine_time), "user_id": row.user_id, "user_name": row.user_name, "nick_name": row.nick_name }) return { "code": 200, "msg": "查询成功", "data": data } else: return { "code": 500, "msg": "查询失败" } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) # 审批确认 @router.post("/work_approval/confirm") async def work_approval_confirm( request: Request, body = Depends(remove_xss_json), db: Session = Depends(get_db), user_id = Depends(valid_access_token)): try: info_id = get_req_param(body, "info_id") action = get_req_param(body, "action") content = get_req_param(body, "content") # 删除之前的待审批记录 db.query(InfoPublishExamine).filter(and_(InfoPublishExamine.examine_type == 20, InfoPublishExamine.examine_sub_type == 20)).update({"del_flag": "2", "content": "content"}) db.commit() user_row = db.query(SysUser).filter(SysUser.user_id == user_id).first() info_id = body['info_id'] examine_type = body['examine_type'] content = body['content'] # 审批通过 if examine_type == 'approved': new_examine = InfoPublishExamine( publish_id = info_id, examine_type = 20, examine_sub_type = 21, content = content, examine_time = datetime.now(), user_id = user_id, user_name = user_row.user_name, nick_name = user_row.nick_name ) db.add(new_examine) db.commit() # publish_status 发布中 # examine_status 审批通过 db.query(InfoPublishBase).filter(InfoPublishBase.id == info_id).update({"publish_status": 3, "examine_status": 3}) db.commit() # 审批不通过 elif examine_type == 'rejected': new_examine = InfoPublishExamine( publish_id = info_id, examine_type = 20, examine_sub_type = 22, content = content, examine_time = datetime.now(), user_id = user_id, user_name = user_row.user_name, nick_name = user_row.nick_name ) db.add(new_examine) db.commit() # publish_status 取消发布 # examine_status 审批不通过 db.query(InfoPublishBase).filter(InfoPublishBase.id == info_id).update({"publish_status": 9, "examine_status": 9}) db.commit() return { "code": 200, "msg": "审批成功", "data": action } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.post("/work_approval/redirect") async def WorkApprovalRedirect( request: Request, body = Depends(remove_xss_json), db: Session = Depends(get_db), user_id = Depends(valid_access_token)): try: info_id = get_req_param(body, "info_id") redirect_id = get_req_param(body, "redirect_id") # 删除之前的待审批记录 db.query(InfoPublishExamine).filter(and_(InfoPublishExamine.examine_type == 20, InfoPublishExamine.examine_sub_type == 20)).update({"del_flag": "2", "content": "content"}) db.commit() user_row = db.query(SysUser).filter(SysUser.user_id == user_id).first() # 转交 new_examine = InfoPublishExamine( publish_id = body['info_id'], examine_type = 40, # 转交 examine_sub_type = 40, # 转交 content = "转交", examine_time = datetime.now(), user_id = user_id, user_name = user_row.user_name, nick_name = user_row.nick_name ) db.add(new_examine) db.commit() user_row = db.query(SysUser).filter(SysUser.user_id == redirect_id).first() # 待审批 new_examine = InfoPublishExamine( publish_id = body['info_id'], examine_type = 20, examine_sub_type = 20, content = "待审批", examine_time = datetime.now(), user_id = user_id, user_name = user_row.user_name, nick_name = user_row.nick_name ) db.add(new_examine) db.commit() return { "code": 200, "msg": "转交成功" } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e))