#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, HTTPException, Query from sqlalchemy.exc import IntegrityError from fastapi.responses import HTMLResponse, FileResponse from fastapi.responses import JSONResponse from database import get_db from sqlalchemy import text, exists, and_, or_, not_ from sqlalchemy.orm import Session from models import * import json import os from sqlalchemy import create_engine, select from typing import Optional from utils.StripTagsHTMLParser import * from common.db import db_event_management, db_user, db_area, db_msg_center, db_yzy from common.security import valid_access_token import traceback from utils import * from datetime import datetime, timedelta from common import YzyApi from common.db import db_dict from urllib.parse import quote import base64 from io import BytesIO from PIL import Image from config import settings from utils.riskManagement_uitl import get_task_title_by_type, get_rescue_resources_task_title_by_type router = APIRouter() # 消息中心 @router.post("/unread_msg_count") async def msg_count( request: Request, body = Depends(remove_xss_json), db: Session = Depends(get_db), user_id = Depends(valid_access_token)): try: msg_types = get_req_param(body, "msg_types") msg_type_list = msg_types.split(",") data = db_msg_center.get_unread_msg_count(db, user_id, msg_type_list) return { "code": 200, "msg": "反馈成功", "data": data } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.post("/send_yzy_msg") async def send_yzy_msg(request: Request, body = Depends(remove_xss_json), db: Session = Depends(get_db), user_id = Depends(valid_access_token)): msg_type = body['msg_type'] detail_url = body['detail_url'] description = body['description'] foreign_key = '0' if 'foreign_key' in body: foreign_key = body['foreign_key'] from_scenario = '' if 'from_scenario' in body: from_scenario = body['from_scenario'] user_list = [] tousers = body['tousers'] for to_user_id in tousers: user_info = db_user.get_user_info(db, to_user_id) yzy_account = user_info.yzy_account yzy_userid = db_yzy.get_userid_by_account(db, yzy_account) if yzy_userid not in user_list: data = { "yzy_userid": yzy_userid, "mobile": yzy_account, "content": description, "recorded_by": user_id, "detail_url": settings.YZY_WEB_ROOT + detail_url, "foreign_key": foreign_key, "from_scenario": from_scenario, "title": msg_type } YzyApi.add_to_msg_queue(db, data) db_msg_center.add_msg(db, msg_type, foreign_key, to_user_id) user_list.append(yzy_userid) return { "code": 200, "msg": "消息暂存成功" } @router.get('/list') async def get_center_list( search_keyword: str = Query('', description='信息内容'), info_type: str = Query('', description='类型'), page: int = Query(1, gt=0, description='页码'), begin_time_s: str = Query(None, description='开始时间'), end_time_s: str = Query(None, description='结束时间'), info_order: str = Query("desc", description='时间排序'), page_size: int = Query(10, gt=0, description='pageSize'), db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): try: # 应用查询条件 where = and_(MsgCenter.del_flag == '0', MsgCenter.recv_userid == user_id) if search_keyword != '': where = and_(where, MsgCenter.content.like('%{}%'.format(search_keyword))) if info_type != '': where = and_(where, MsgCenter.msg_type == info_type) if begin_time_s != None: begin_time = datetime.strptime(begin_time_s, "%Y-%m-%d") where = and_(where, MsgCenter.recv_time >= begin_time) if end_time_s != None: end_time = datetime.strptime(end_time_s, "%Y-%m-%d") + timedelta(days=1) where = and_(where, MsgCenter.recv_time < end_time) print(where) # 计算总条目数 q = db.query(func.count(MsgCenter.id)) q = q.filter(where) total = q.scalar() # 执行分页查询 q = db.query(MsgCenter) q = q.filter(where) if info_order == 'desc': q = q.order_by(MsgCenter.recv_time.desc()) if info_order == 'asc': q = q.order_by(MsgCenter.recv_time.asc()) rows = q.offset((page - 1) * page_size).limit(page_size).all() data = [] for row in rows: foreign_key = row.foreign_key from_scenario = row.from_scenario detail = {} if row.msg_type == '任务消息': try: task_info = db.query(TaskRegistration).filter(TaskRegistration.task_id == foreign_key).first() event_code = task_info.event_code event_title = db_event_management.get_event_title(db, event_code) detail = { "event_title": event_title, "creation_time": get_datetime_str(task_info.creation_time), "task_description": task_info.task_description, "detail_url": "/infoDetails?id="+str(row.id) } except: traceback.print_exc() elif row.msg_type == '预案通知': try: notify_info = db.query(EventEmergencyNotify).filter(EventEmergencyNotify.id == foreign_key).first() event_id = notify_info.event_id plan_id = notify_info.plan_id sent_time = notify_info.sent_time plan_info = db.query(EmergencyPlan).filter(EmergencyPlan.plan_number == plan_id).first() plan_name = plan_info.plan_name event_info = db.query(EventBase).filter(EventBase.event_code == event_id).first() response_level = db_dict.get_dict_label(db, "response_level", event_info.response_level) detail = { "title": row.title, "organizing_unit": plan_info.organizing_unit, # 发布单位 "plan_name": plan_name, # 预案名称 "response_level": response_level, # 相应级别 "sent_time": get_datetime_str(sent_time), # 发送时间 "yzy_content": notify_info.yzy_content, # 消息内容 "detail_url": "/infoDetails?id="+str(row.id) } except: traceback.print_exc() elif row.msg_type == '在线点名': try: call_info = db.query(OnlineRollCallDetail).filter(OnlineRollCallDetail.id == foreign_key).first() detail = { "call_title": row.content, "create_time": get_datetime_str(call_info.create_time), # 发送时间 "detail_url": "/infoDetails?id="+str(row.id) } except: traceback.print_exc() elif row.msg_type == '事件接报': try: event_info = db.query(EventBase).filter(EventBase.event_code == foreign_key).first() detail = { "event_title": row.title, "address": event_info.address, "event_level": db_dict.get_dict_label(db, "mm_event_level", event_info.event_level), "event_time": get_datetime_str(event_info.event_time), # 事发时间 "detail_url": "/infoDetails?id="+str(row.id) } except: traceback.print_exc() elif row.msg_type == '隐患巡查': try: # task_info = db.query(RiskManagementInspectionTask).filter(RiskManagementInspectionTask.task_number == foreign_key).first() # task_title = get_task_title_by_type(task_info.inspection_business) detail = { "detail_url": "/infoDetails?id="+str(row.id) } except: traceback.print_exc() elif row.msg_type == '风险排查': try: # task_info = db.query(RiskManagementRiskTask).filter(RiskManagementRiskTask.task_number == foreign_key).first() # task_title = db_dict.get_dict_label(db, 'risk_type', task_info.risk_type) detail = { "detail_url": "/infoDetails?id="+str(row.id) } except: traceback.print_exc() elif row.msg_type == '数据采集': try: # task_info = db.query(RiskManagementRescueResourcesTask).filter(RiskManagementRescueResourcesTask.task_number == foreign_key).first() # task_title = get_rescue_resources_task_title_by_type(task_info.type) detail = { "detail_url": "/infoDetails?id="+str(row.id) } except: traceback.print_exc() elif row.msg_type == '值班消息': try: detail = { "detail_url": "/infoDetails?id="+str(row.id) } except: traceback.print_exc() elif row.msg_type == '审批消息': try: detail = { "detail_url": "/infoDetails?id="+str(row.id) } except: traceback.print_exc() elif row.msg_type == '系统消息': try: detail = { "detail_url": "/infoDetails?id="+str(row.id) } except: traceback.print_exc() # 信息发布 elif row.msg_type in ['预警信息', '灾情信息', '灾情信息', '指挥救援', '公众防范']: try: info = db.query(InfoPublishBase).filter(InfoPublishBase.id == foreign_key).first() detail = { "detail_url": "/infoDetails?id="+str(info.id) } except: traceback.print_exc() data.append({ "id": row.id, "msg_type": row.msg_type, "title": row.title, "content": row.content, "recv_time": get_datetime_str(row.recv_time), "detail": detail }) # 返回结果 return { "code": 200, "msg": "查询成功", "data": data, "total": total } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e))