#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, HTTPException, Query from sqlalchemy.exc import IntegrityError from fastapi.responses import HTMLResponse, FileResponse from fastapi.responses import JSONResponse from database import get_db from sqlalchemy import text, exists, and_, or_, not_ from sqlalchemy.orm import Session from models import * import json import os from sqlalchemy import create_engine, select from typing import Optional from utils.StripTagsHTMLParser import * from common.db import db_event_management, db_user, db_area, db_emergency_plan from common.security import valid_access_token import traceback from utils import * from datetime import datetime, timedelta from common import YzyApi from common.db import db_dict from urllib.parse import quote import base64 from io import BytesIO from PIL import Image from config import settings router = APIRouter() # 信息发布分页(我能看的内容,小屏) @router.get('/list') async def get_publish_list( search_keyword: str = Query('', description='信息内容'), page: int = Query(1, gt=0, description='页码'), begin_time_s: str = Query(None, description='开始时间'), end_time_s: str = Query(None, description='结束时间'), info_order: str = Query("desc", description='时间排序'), page_size: int = Query(10, gt=0, description='pageSize'), db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): try: # 应用查询条件 where = and_(InfoPublishBase.del_flag == '0', InfoPublishBase.publish_status == 4) if search_keyword != '': where = and_(where, InfoPublishBase.content.like('%{}%'.format(search_keyword))) if begin_time_s != None: begin_time = datetime.strptime(begin_time_s, "%Y-%m-%d") where = and_(where, InfoPublishBase.publish_time >= begin_time) if end_time_s != None: end_time = datetime.strptime(end_time_s, "%Y-%m-%d") + timedelta(days=1) where = and_(where, InfoPublishBase.publish_time < end_time) print(where) subquery = db.query(InfoPublishResponses.publish_id).filter(InfoPublishResponses.user_id == user_id).subquery() # 计算总条目数 q = db.query(func.count(InfoPublishBase.id)) q = q.filter(where).filter(InfoPublishBase.id == subquery.c.publish_id) total = q.scalar() # 执行分页查询 q = db.query(InfoPublishBase) q = q.filter(where).filter(InfoPublishBase.id == subquery.c.publish_id) if info_order == 'desc': q.order_by(InfoPublishBase.publish_time.desc()) if info_order == 'asc': q.order_by(InfoPublishBase.publish_time.asc()) rows = q.offset((page - 1) * page_size).limit(page_size).all() data = [] for row in rows: # 发布申请人 recorded_by = row.recorded_by user_row = db.query(SysUser).filter(SysUser.user_id == recorded_by).first() nick_name = "" dept_name = "" if user_row is not None: nick_name = user_row.nick_name dept_id = user_row.dept_id dept_row = db.query(SysDept).filter(SysDept.dept_id == dept_id).first() if dept_row is not None: dept_name = dept_row.dept_name # 待处理人 examine_user = "无" examine_by = row.examine_by user_row = db.query(SysUser).filter(SysUser.user_id == examine_by).first() if user_row is not None: examine_user = user_row.nick_name data.append({ "id": row.id, "title": row.title, "info_type": row.info_type, "publish_group": row.publish_group, "content": row.content, "publish_time": get_datetime_str(row.publish_time), "publish_channel": row.publish_channel, "nick_name": nick_name, "dept_name": dept_name, "examine_user": examine_user, "publish_status": db_dict.get_dict_label(db, "mm_publish_status", row.publish_status), "examine_status": db_dict.get_dict_label(db, "mm_examine_status", row.examine_status), "user_count": row.user_count, "user_ok_count": row.user_ok_count, "user_err_count": row.user_err_count, "user_sending_count": row.user_sending_count, "is_my_edit": (row.examine_status == 0 or row.examine_status == 9) and row.recorded_by == user_id, # 是否我的编辑事项 "is_my_examine": row.examine_status == 1 and int(row.examine_by) == user_id # 是否我的审批事项 }) # 返回结果 return { "code": 200, "msg": "查询成功", "data": data, "total": total } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.get('/detail') async def get_info_detail( request: Request, id: str = Query(None, description='信息编号'), db: Session = Depends(get_db), user_id = Depends(valid_access_token)): try: # 构建查询 query = db.query(InfoPublishBase) query = query.filter(InfoPublishBase.id == id) # 执行查询 row = query.first() if row is not None: response_row = db.query(InfoPublishResponses).filter(and_(InfoPublishResponses.publish_id == id, InfoPublishResponses.user_id == user_id)).first() data = { "id": row.id, "title": row.title, "info_type": row.info_type, "publish_group": row.publish_group, "content": row.content, "publish_time": get_datetime_str(row.publish_time), "publish_channel": row.publish_channel, "response_type": response_row.response_type, "response_status": response_row.response_status, "response_time": get_datetime_str(response_row.response_time), "signature": get_signature_base64(db, response_row.id) } # 附件 rows = db.query(InfoPublishFile).filter(and_(InfoPublishFile.from_scenario=="infopublish_attach_file", InfoPublishFile.foreign_key == id, InfoPublishFile.del_flag == '0')).all() data['files'] = [ { "name": row.file_name, "url": row.storage_file_name } for row in rows ] return { "code": 200, "msg": "查询成功", "data": data } else: return { "code": 500, "msg": "查询失败" } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.post("/confirmReceived") async def confirmReceived( request: Request, body = Depends(remove_xss_json), db: Session = Depends(get_db), user_id = Depends(valid_access_token)): try: id = get_req_param(body, "id") row = db.query(InfoPublishResponses).filter(and_(InfoPublishResponses.publish_id == id, InfoPublishResponses.user_id == user_id)).first() if row is None: return { "code": 500, "msg": "查询失败" } row.response_time = datetime.now() row.response_status = 1 # 已确认 db.commit() return { "code": 200, "msg": "反馈成功", "data": { "response_status": row.response_status } } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) # 确定签名 @router.post("/confirmSignature") async def confirmSignature( request: Request, body = Depends(remove_xss_json), db: Session = Depends(get_db), user_id = Depends(valid_access_token)): try: id = get_req_param(body, "id") row = db.query(InfoPublishResponses).filter(and_(InfoPublishResponses.publish_id == id, InfoPublishResponses.user_id == user_id)).first() if row is None: return { "code": 500, "msg": "查询失败" } response_id = row.id file_name = new_guid() + ".png" file_path = f'/data/upload/mergefile/uploads/{file_name}' base64_data = get_req_param(body, "image") base64_data = base64_data.replace("data:image/png;base64,", "") binary_data = base64.b64decode(base64_data) bytes_io = BytesIO(binary_data) image = Image.open(bytes_io) image.save(file_path) new_file = InfoPublishFile( file_name="信息阅读确认签名.png", storage_file_name=file_name, file_path=f'/data/upload/mergefile/uploads/{file_name}', file_size=os.path.getsize(f'/data/upload/mergefile/uploads/{file_name}'), foreign_key=str(response_id), from_scenario="infopublish_signature_file", update_time=datetime.now(), create_time=datetime.now(), create_by=user_id, create_dept=0, del_flag='0', status=0, ) db.add(new_file) row.response_time = datetime.now() row.response_status = 2 # 已签名 db.commit() return { "code": 200, "msg": "反馈成功", "data": { "response_status": row.response_status } } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) def image2base64(img_file: str): f = open(img_file, 'rb') img_raw_data = f.read() f.close() img_b64_string = base64.b64encode(img_raw_data) return img_b64_string.decode('ascii') def get_signature_base64(db: Session, response_id: int): row = db.query(InfoPublishFile).filter(and_(InfoPublishFile.from_scenario == 'infopublish_signature_file', InfoPublishFile.foreign_key == str(response_id))).first() if row is None: return "" return "data:image/png;base64," + image2base64(row.file_path)