import json import random from sqlalchemy import create_engine, select from utils.StripTagsHTMLParser import * from sqlalchemy.orm import Session from sqlalchemy import text, exists, and_, or_, not_ import traceback from models import * from common.db import db_task from common.security import valid_access_token from datetime import datetime, timedelta from fastapi import APIRouter, Depends, HTTPException, Response, Query from database import get_db from pydantic import BaseModel from utils import * import copy from config import settings from common import YzyApi from common.db import db_event_management, db_user, db_msg_center, db_yzy, db_dept router = APIRouter() @router.post('/create') async def create_task( request: Request, db: Session = Depends(get_db), body=Depends(remove_xss_json), user_id=Depends(valid_access_token)): try: # 验证必需的字段 required_fields = ['task_description', 'unit_name', 'registrar','event_code'] missing_fields = [field for field in required_fields if field not in body] if missing_fields: raise HTTPException(status_code=401, detail=f"Missing required fields: {', '.join(missing_fields)}") event_code = db.query(EventBase).filter(EventBase.event_code == body.get('event_code')).first() if not event_code: return Response(content="事件不存在", status_code=400) unit_id = db_dept.get_dept_id_by_name(db, body['unit_name']) task_id = db_task.get_next_event_id(db) imgList = [] if 'imgList' in body: imgList = body['imgList'] del body['imgList'] fileList = [] if 'fileList' in body: fileList = body['fileList'] del body['fileList'] task_base = TaskRegistration( **body, task_id = task_id, unit_id = unit_id, registrar_id = user_id ) db.add(task_base) db.commit() db.refresh(task_base) for file in imgList: file_name = file['url'] file_name_desc = file['name'] status = file['status'] new_file = TaskFile( file_id=new_guid(), foreign_key=task_base.id, from_scenario='task_img', file_name=file_name, file_name_desc=file_name_desc, status=status ) db.add(new_file) db.commit() for file in fileList: file_name = file['url'] file_name_desc = file['name'] status = file['status'] new_file = TaskFile( file_id=new_guid(), foreign_key=task_base.id, from_scenario='task_file', file_name=file_name, file_name_desc=file_name_desc, status=status ) db.add(new_file) db.commit() # 发送粤政易消息 send_yzy_msg(db, task_base, user_id) return { "code": 200, "msg": "任务创建成功", "data": task_id } except Exception as e: db.rollback() traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") # 发送粤政易消息 def send_yzy_msg(db: Session, task_base: TaskRegistration, recorded_by: int) -> None: to_user_id = task_base.executor_id user_info = db_user.get_user_info(db, to_user_id) yzy_account = user_info.yzy_account yzy_userid = db_yzy.get_userid_by_account(db, yzy_account) event_title = db_event_management.get_event_title(db, task_base.event_code) create_time = get_datetime_str(task_base.creation_time) detail_url = "{}{}".format(settings.YZY_WEB_ROOT, "/yjxp/") description = "事件名称: " + event_title + "\n发布时间:" + create_time + "\n任务内容: "+task_base.task_description data = { "yzy_userid": yzy_userid, "mobile": yzy_account, "content": description, "recorded_by": recorded_by, "detail_url": detail_url, "foreign_key": task_base.task_id, "from_scenario": "task_registrations", "title": f"{event_title}任务" } YzyApi.add_to_msg_queue(db, data) db_msg_center.add_message(db, "任务消息", to_user_id, f"{event_title}任务", task_base.task_description, task_base.task_id, 'task_registrations') class TaskQuery(BaseModel): task_id: str = None task_description: str = None unit_name: str = None registrar: str = None creation_time: str = None processing_status: str = None @router.post('/select') @router.get('/select') async def select_tasks( request: Request, db: Session = Depends(get_db), query: TaskQuery = Depends(), sortBy: str = Query(None, description="排序字段"), sortOrder: str = Query(None, description="排序顺序"), user_id=Depends(valid_access_token), pageNum: int = Query(1, gt=0, description="页码"), event_code: str = Query(None, description="事件ID"), pageSize: int = Query(10, gt=0, le=100, description="每页大小")): try: missing_event_code = db.query(EventBase).filter(EventBase.event_code == event_code).first() # print(missing_event_code) if not missing_event_code: return { "code": 500, "msg": "事件不存在" } data_query = db.query(TaskRegistration).filter(TaskRegistration.del_flag != '2') data_query = data_query.filter(TaskRegistration.event_code == event_code) # 应用过滤条件 if query.task_id: data_query = data_query.filter(TaskRegistration.task_id == query.task_id) if query.task_description: data_query = data_query.filter(TaskRegistration.task_description == query.task_description) if query.unit_name: data_query = data_query.filter(TaskRegistration.unit_name == query.unit_name) if query.registrar: data_query = data_query.filter(TaskRegistration.registrar == query.registrar) if query.creation_time: data_query = data_query.filter(TaskRegistration.creation_time == query.creation_time) if query.processing_status: data_query = data_query.filter(TaskRegistration.processing_status == query.processing_status) # print(TaskRegistration,sortBy) if sortBy: if hasattr(TaskRegistration, sortBy): print("xx") sort_attr = getattr(TaskRegistration, sortBy) data_query = data_query.order_by(sort_attr.asc() if sortOrder == 'asc' else sort_attr.desc()) total_count = data_query.count() offset = (pageNum - 1) * pageSize data_query = data_query.offset(offset).limit(pageSize) rows = data_query.all() data = [] for row in rows: info = get_model_dict(row) info['expire_time'] = get_date_str(info['expire_time']) info['imgList'] = db_task.get_image_file_list(db, info['task_id']) info['fileList'] = db_task.get_task_file_list(db, info['task_id']) data.append(info) return { "code": 200, "msg": "任务查询成功", "data": data, "total": total_count, "pages": (total_count + pageSize - 1) // pageSize, "current_page": pageNum, "page_size": pageSize } except Exception as e: db.rollback() traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.put('/update') async def update_task_status( request: Request, body = Depends(remove_xss_json), db: Session = Depends(get_db), user_id=Depends(valid_access_token)): try: task_id_to_use = get_req_param(body, 'task_id') processing_status = get_req_param(body, 'processing_status') feedback_content = get_req_param(body, 'feedback_content') attachList = get_req_param(body, 'fileList') longitude = latitude = None if 'lnglat' in body: longitude = body['lnglat'][0] latitude = body['lnglat'][1] if not task_id_to_use: return Response(content="Missing required parameter 'task_id'", status_code=400) if processing_status not in ['已完成', '处理中', '待处理','未完成']: return Response(content="processing_status must be '已完成' or '处理中' or '待处理' or '未完成'", status_code=400) task_entry = (db.query(TaskRegistration) .filter(TaskRegistration.del_flag != '2') .filter(TaskRegistration.task_id == task_id_to_use)) task_entry = task_entry.first() if not task_entry: raise HTTPException(status_code=404, detail="任务不存在") new_feeback = TaskFeeback( task_id = task_id_to_use, feeback_user = task_entry.registrar, content = feedback_content, processing_status = processing_status, create_time = datetime.now(), feeback_type = "0", leader_unit = "", leader_name = "", approval_content = "", recorded_by=user_id, longitude = longitude, latitude = latitude ) db.add(new_feeback) db.commit() task_entry.processing_status = processing_status task_entry.update_time = datetime.now() db.commit() if attachList is not None: for file in attachList: file_name = file['url'] file_name_desc = file['name'] status = file['status'] new_file = TaskFile( file_id=new_guid(), foreign_key=task_id_to_use, from_scenario='task_feeback', file_name=file_name, file_name_desc=file_name_desc, status=status ) db.add(new_file) db.commit() return { "code": 200, "msg": "任务状态更新成功", "data": { "task_id": task_id_to_use, "processing_status": task_entry.processing_status, "update_time": task_entry.update_time.isoformat() } } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.delete('/delete') async def delete_task( request: Request, db: Session = Depends(get_db), user_id=Depends(valid_access_token)): try: body = await request.json() task_id_to_use = body.get('taskID') if not task_id_to_use: raise HTTPException(status_code=400, detail="Missing required parameter 'taskID'") event_code = db.query(EventBase).filter(EventBase.event_code == body.get('event_code')).first() if not event_code: return Response(content="事件不存在", status_code=400) task_entry = db.query(TaskRegistration).filter(TaskRegistration.task_id == task_id_to_use).first() if not task_entry: raise HTTPException(status_code=404, detail="任务不存在") task_entry.del_flag = '2' try: db.commit() return { "code": 200, "msg": "任务删除成功", "data": { "task_id": task_entry.task_id } } except Exception as e: db.rollback() raise HTTPException(status_code=500, detail=f"An error occurred while deleting the task: {str(e)}") except HTTPException as e: raise e except Exception as e: db.rollback() raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") @router.get('/selectUnit') async def select_unit( request: Request, db: Session = Depends(get_db), query: TaskQuery = Depends(), user_id=Depends(valid_access_token), ): try: data_query = db.query(TaskUnit).filter(TaskUnit.id != '2').all() return { "code": 200, "msg": "查询成功", "data": data_query } except HTTPException as e: raise e except Exception as e: db.rollback() raise HTTPException(status_code=500, detail=f"An unexpected error occurred: {str(e)}") # # 小屏专用,不敢搞大屏相关代码 # @router.get('/list') async def select_tasks( request: Request, task_type: str = Query(None), processing_status: str = Query(None), search_keyword: str = Query(None), page: int = Query(1, gt=0, description="页码"), page_size: int = Query(10, gt=0, le=100, description="每页大小"), db: Session = Depends(get_db)): try: data_query = db.query(TaskRegistration).filter(TaskRegistration.del_flag != '2').filter(TaskRegistration.event_code is not None) # 应用过滤条件 if task_type is not None: data_query = data_query.filter(TaskRegistration.task_type == task_type) if search_keyword is not None: data_query = data_query.filter(TaskRegistration.task_description.like('%{}%'.format(search_keyword))) if processing_status is not None: data_query = data_query.filter(TaskRegistration.processing_status == processing_status) total_count = data_query.count() offset = (page - 1) * page_size data_query = data_query.order_by(TaskRegistration.creation_time.desc()).offset(offset).limit(page_size) data = [] tasks = data_query.all() for n in tasks: task = get_model_dict(n) if task['processing_status'] == '': task['processing_status'] = '处理中' task['creation_time'] = get_datetime_str(task['creation_time']) task['event_name'] = db_event_management.get_event_title(db, n.event_code) data.append(task) return { "code": 200, "msg": "任务查询成功", "data": data, "total": total_count } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/leader_request") async def leader_request( request: Request, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: task_id = body['task_id'] processing_status = body['processing_status'] new_feeback = TaskFeeback( **body, recorded_by=user_id ) db.add(new_feeback) db.commit() db.query(TaskRegistration).filter(TaskRegistration.task_id == task_id).update( { #"processing_status": processing_status, "update_time": datetime.now()} ) db.commit() return { "code": 200, "msg": "请示领导保存成功" } except Exception as e: # 处理异常 db.rollback() traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.post("/feeback") async def feeback( request: Request, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: feeback_user = get_req_param_optional(body, 'feeback_user') task_id = body['task_id'] processing_status = body['processing_status'] new_file_list = [] if 'fileList' in body: new_file_list = copy.deepcopy(body['fileList']) del body['fileList'] if feeback_user == '': feeback_user = db_user.get_user_info(db, user_id).nick_name if 'feeback_user' in body: del body['feeback_user'] new_feeback = TaskFeeback( **body, feeback_user=feeback_user, recorded_by=user_id ) db.add(new_feeback) db.commit() db.refresh(new_feeback) print(new_file_list) for file in new_file_list: file_name = file['file_name'] file_name_desc = file['file_name_desc'] status = file['status'] new_file = TaskFile( file_id=new_guid(), foreign_key=new_feeback.id, from_scenario='task_feeback', file_name=file_name, file_name_desc=file_name_desc, status=status ) db.add(new_file) db.commit() db.query(TaskRegistration).filter(TaskRegistration.task_id == task_id).update({"processing_status": processing_status, "update_time": datetime.now()}) db.commit() return { "code": 200, "msg": "任务反馈保存成功" } except Exception as e: # 处理异常 db.rollback() traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.get('/detail') async def get_task_detail( request: Request, task_id: str = Query(None, description='任务编号'), db: Session = Depends(get_db)): print('task_id:',task_id) try: # 构建查询 rows = db.query(TaskFeeback).filter(and_(TaskFeeback.task_id == task_id)).all() feebacks = [] for row in rows: fileList = [] # 附件 rows = db.query(TaskFile).filter(and_(TaskFile.from_scenario=="task_feeback", TaskFile.foreign_key == row.id, InfoPublishFile.del_flag == '0')).all() fileList = [ { "name": row.file_name, "url": row.storage_file_name } for row in rows ] info = get_model_dict(row) info['create_time'] = get_datetime_str(row.create_time) info['approval_time'] = get_datetime_str(row.approval_time) info['fileList'] = fileList info['fileCount'] = len(fileList) feebacks.append(info) query = db.query(TaskRegistration) query = query.filter(TaskRegistration.task_id == task_id) # 执行查询 row = query.first() if row is not None: return { "code": 200, "msg": "查询成功", "data": { "id": row.id, "task_id": row.task_id, "task_type": row.task_type, "event_code": row.event_code, "event_name": db_event_management.get_event_title(db, row.event_code), "processing_status": row.processing_status, "expire_time": get_datetime_str(row.expire_time), "update_time": get_datetime_str(row.update_time), "feeback_user": row.feeback_user, "task_description": row.task_description, "feebacks": feebacks } } else: return { "code": 500, "msg": "查询失败" } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) # 请示列表 @router.get('/request/list') async def select_tasks( request: Request, task_type: str = Query(None), processing_status: str = Query(None), search_keyword: str = Query(None), page: int = Query(1, gt=0, description="页码"), page_size: int = Query(10, gt=0, le=100, description="每页大小"), db: Session = Depends(get_db)): try: data_query = db.query(TaskFeeback).filter(TaskFeeback.feeback_type == '1') # 应用过滤条件 if task_type is not None: subquery = db.query(TaskRegistration.task_id).filter(and_(TaskRegistration.del_flag == '0', TaskRegistration.task_type == task_type)).subquery() data_query = data_query.filter(TaskFeeback.task_id == subquery.c.task_id) if processing_status is not None: data_query = data_query.filter(TaskFeeback.processing_status == processing_status) if search_keyword is not None: data_query = data_query.filter(TaskFeeback.content.like('%{}%'.format(search_keyword))) total_count = data_query.count() offset = (page - 1) * page_size data_query = data_query.order_by(TaskFeeback.create_time.desc()).offset(offset).limit(page_size) data = [] tasks = data_query.all() for n in tasks: task = get_model_dict(n) task['nick_name'] = db_user.get_nick_name_by_id(db, task['recorded_by']) task['create_time'] = get_datetime_str(task['create_time']) task['approval_time'] = get_datetime_str(task['approval_time']) task['update_time'] = task['create_time'] if task['approval_time'] != None: task['update_time'] = task['approval_time'] task['event_name'] = '' task_id = task['task_id'] reg_info = db.query(TaskRegistration).filter(TaskRegistration.task_id == task_id).first() if reg_info is not None: reg_info = get_model_dict(reg_info) task['event_name'] = db_event_management.get_event_title(db, reg_info['event_code']) data.append(task) return { "code": 200, "msg": "请示查询成功", "data": data, "total": total_count } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") # 请示批复 @router.post("/approval") async def approval( request: Request, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: feeback_id = get_req_param(body, "feeback_id") approval_content = get_req_param_optional(body, 'approval_content') row = db.query(TaskFeeback).filter(TaskFeeback.id == feeback_id).first() if row is None: return { "code": 500, "msg": "请示不存在" } row.approval_content = approval_content row.approval_time = datetime.now() row.processing_status = '已完成' db.commit() return { "code": 200, "msg": "请示批复保存成功" } except Exception as e: # 处理异常 db.rollback() traceback.print_exc() raise HTTPException(status_code=500, detail=str(e))