from fastapi import APIRouter, Request, Depends, HTTPException, Query, BackgroundTasks from sqlalchemy.exc import IntegrityError from fastapi.responses import HTMLResponse, FileResponse from fastapi.responses import JSONResponse from database import get_db from sqlalchemy import text, exists, and_, or_, not_ from sqlalchemy.orm import Session from models import * import json import random import os from sqlalchemy import create_engine, select from typing import Optional from utils.StripTagsHTMLParser import * from utils.three_proofing_responsible_util import * from utils.ry_system_util import * from common.db import db_event_management, db_user, db_area, db_emergency_plan from common.security import valid_access_token import traceback from utils import * from datetime import datetime, timedelta import pandas as pd import xlrd from common.db import db_dept from exceptions import AppException router = APIRouter() @router.post('/create') async def create_contact( db: Session = Depends(get_db), body=Depends(remove_xss_json), user_id=Depends(valid_access_token) ): try: # 提取请求数据 unit_name = body['unit_name'] name = body['name'] area_code = body['area_code'] position = body['position'] phone = body['phone'] telephone= '' if 'telephone' in body: telephone=body['telephone'] order_num = -1 if 'order_num'in body: order_num=body['order_num'] unit_id = db_dept.get_dept_id_by_name(db, unit_name) user_id_1 = db_user.get_user_id_by_phonenumber(db,phone) # 创建新的预案记录 new_person = ThreeProofingResponsiblePerson( unit_id=unit_id, unit_name=unit_name, name=name, area_code=area_code, position=position, phone=phone, telephone=telephone, user_id = user_id_1, order_num=order_num, create_by=user_id ) # 添加到数据库会话并提交 type_list = body['type_list'] if isinstance(type_list,list) and len(type_list)>0: db.add(new_person) db.commit() else: return JSONResponse(status_code=404,content={ 'code': 404, 'msg': '责任类型不能为空' }) try: for type_info in type_list: type_parent_id = type_info['type_parent_id']#get_type_parent_id_by_type_id(db,type_id) for type_id in type_info['children']: new_person_type = ThreeProofingResponsiblePersonType( type_parent_id = type_parent_id, type_id = type_id, person_id = new_person.id, create_by=user_id ) db.add(new_person_type) if type_parent_id in ('5','7','9'): if 'children2' in type_info: for other_type_id in type_info['children2']: new_person_other_type = ThreeProofingResponsiblePersonOtherType( type_parent_id=type_parent_id, other_type_id=other_type_id, person_id=new_person.id, create_by=user_id ) db.add(new_person_other_type) if type_parent_id in ('4','5','7','10','11'): dept_name = None if 'dept_name' in type_info: dept_name = type_info['dept_name'] other_type_2_name = None if 'other_type_2_name' in type_info: other_type_2_name = type_info['other_type_2_name'] denger_point_name = None if 'denger_point_name' in type_info: denger_point_name = type_info['denger_point_name'] new_person_other_info = ThreeProofingResponsiblePersonOtherInfo( type_parent_id = type_parent_id, dept_name = dept_name, other_type_2_name = other_type_2_name, denger_point_name = denger_point_name, person_id = new_person.id, create_by=user_id ) db.add(new_person_other_info) except: db.rollback() traceback.print_exc() new_person.del_flag='2' db.commit() # 返回创建成功的响应 return { "code": 200, "msg": "创建成功", "data": None } except Exception as e: # 处理异常 db.rollback() traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.put('/update') async def update_contact( db: Session = Depends(get_db), body=Depends(remove_xss_json), user_id=Depends(valid_access_token) ): try: # 提取请求数据 id = body['id'] person = get_person_info_by_id(db,id) if not person: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': '责任人不存在' }) person.unit_name = body['unit_name'] person.name = body['name'] person.area_code = body['area_code'] person.position = body['position'] person.phone = body['phone'] if 'telephone' in body: person.telephone=body['telephone'] person.order_num = body['order_num'] person.unit_id = db_dept.get_dept_id_by_name(db, body['unit_name']) type_list = body['type_list'] old_person_type_list=get_person_type_by_person_id(db,person.id) for old_person_type in old_person_type_list: old_person_type.del_flag='2' old_person_other_info_list = get_person_other_info_by_person_id(db,person.id) for old_person_other_info in old_person_other_info_list: old_person_other_info.del_flag = '2' old_person_other_type_list = get_person_other_type_by_person_id(db,person.id) for old_person_other_type in old_person_other_type_list: old_person_other_type.del_flag = '2' for type_info in type_list: type_parent_id = type_info['type_parent_id'] # get_type_parent_id_by_type_id(db,type_id) for type_id in type_info['children']: new_person_type = ThreeProofingResponsiblePersonType( type_parent_id=type_parent_id, type_id=type_id, person_id=id, create_by=user_id ) db.add(new_person_type) if type_parent_id in ('5', '7', '9'): if 'children2' in type_info: for other_type_id in type_info['children2']: new_person_other_type = ThreeProofingResponsiblePersonOtherType( type_parent_id=type_parent_id, other_type_id=other_type_id, person_id=person.id, create_by=user_id ) db.add(new_person_other_type) if type_parent_id in ('4', '5', '7', '10', '11'): dept_name = None if 'dept_name' in type_info: dept_name = type_info['dept_name'] other_type_2_name = None if 'other_type_2_name' in type_info: other_type_2_name = type_info['other_type_2_name'] denger_point_name = None if 'denger_point_name' in type_info: denger_point_name = type_info['denger_point_name'] new_person_other_info = ThreeProofingResponsiblePersonOtherInfo( type_parent_id=type_parent_id, dept_name=dept_name, other_type_2_name=other_type_2_name, denger_point_name=denger_point_name, person_id=person.id, create_by=user_id ) db.add(new_person_other_info) # 更新到数据库会话并提交 db.commit() # 返回创建成功的响应 return { "code": 200, "msg": "更新成功", "data": None } except Exception as e: # 处理异常 db.rollback() traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.get('/list') async def get_emergency_contact_list( type_parent_id: str = Query(None, description='单位名称'), area_code:str = Query(None, description='单位名称'), Name: str = Query(None, description='联系人'), page: int = Query(1, gt=0, description='页码'), pageSize: int = Query(10, gt=0, description='每页条目数量'), db: Session = Depends(get_db), user_id=Depends(valid_access_token) ): try: # 构建查询 query = db.query(ThreeProofingResponsiblePerson) query = query.filter(ThreeProofingResponsiblePerson.del_flag == '0') # 应用查询条件 if type_parent_id: person_list = get_person_list_by_type_parent_id(db,type_parent_id) query = query.filter(ThreeProofingResponsiblePerson.id.in_(person_list)) if Name: query = query.filter(ThreeProofingResponsiblePerson.name.like(f'%{Name}%')) def get_area_chli(area_list : list,parent_id : int): areas = parent_id_get_area_info(db,parent_id) if areas: for area in areas: area_list.append(area.id) get_area_chli(area_list, area.id) return area_list if area_code: query = query.filter(ThreeProofingResponsiblePerson.area_code.in_(get_area_chli([area_code],area_code))) # if area_code: # query = query.filter(ThreeProofingResponsiblePerson.area_code==area_code) # 计算总条目数 total_items = query.count() # 排序 query = query.order_by(ThreeProofingResponsiblePerson.order_num.asc()) query = query.order_by(ThreeProofingResponsiblePerson.create_time.desc()) # 执行分页查询 contact_infos = query.offset((page - 1) * pageSize).limit(pageSize).all() # 将查询结果转换为列表形式的字典 contact_infos_list = [] for info in contact_infos: type_parent_id_list = get_type_parent_id_by_person_id(db,info.id) type_parent_list = [] type_parent_list2 = [] for type_parent in type_parent_id_list: if type_parent not in type_parent_list2: dict_data = get_dict_data_info(db,'three_proofing',type_parent) type_parent_list2.append(type_parent) type_parent_list.append({"type_parent_id":type_parent,"type_parent":dict_data.dict_label}) area_info = id_get_area_info(db,info.area_code) user_info = user_id_get_user_info(db,info.create_by) area_list = db_area.id_get_area_parent_list(db,info.area_code,[]) contact_infos_list.append({ "id": info.id, "unit_id": info.unit_id, "unit_name": info.unit_name, "name": info.name, "area_list":area_list, "area_code": info.area_code, "area_name": area_info.area_name, "position": info.position, "phone": info.phone, "telephone":info.telephone, "order_num":info.order_num, "online_status":'0', "create_time": info.create_time.strftime('%Y-%m-%d %H:%M:%S'), "create_by":info.create_by, "create_user":user_info.nick_name, "type_parent_list":type_parent_list }) # 返回结果+ return { "code": 200, "msg": "成功", "data": contact_infos_list, "total": total_items } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.get('/info/{id}') async def get_emergency_contact_id_info( id: str, db: Session = Depends(get_db), user_id=Depends(valid_access_token) ): try: contact = get_person_info_by_id(db,id) if not contact: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': '联系人不存在' }) # 将查询结果转换为列表形式的字典 area_info = id_get_area_info(db,contact.area_code) user_info = user_id_get_user_info(db,contact.create_by) area_list = db_area.id_get_area_parent_list(db, contact.area_code, []) contact_result = { "id": contact.id, "unit_id": contact.unit_id, "unit_name": contact.unit_name, "name": contact.name, "area_list":area_list, "area_code":contact.area_code, "area_name": area_info.area_name, "position": contact.position, "phone": contact.phone, "telephone":contact.telephone, "order_num":contact.order_num, "online_status":'0', "create_time": contact.create_time.strftime('%Y-%m-%d %H:%M:%S'), "create_by":contact.create_by, "create_user":user_info.nick_name, "type_list":[] } type_parent_id_list = [] type_list = get_person_type_by_person_id(db,contact.id) for type_info in type_list: if type_info.type_parent_id not in type_parent_id_list: type_parent_id_list.append(type_info.type_parent_id) for type_parent_id in type_parent_id_list: dict_data = get_dict_data_info(db, 'three_proofing', type_parent_id) type_data = {"type_parent_id": type_parent_id, "type_parent": dict_data.dict_label,"children":[],"labelData":[]} for type_info in get_person_type_by_person_id_and_type_parent_id(db,contact.id,type_parent_id): type_data_info = get_type_info_by_id(db,type_info.type_id) # type_data['children'].append({"id": type_info.type_id, "label": type_data_info.type_name}) type_data['children'].append(type_info.type_id) type_data['labelData'].append( type_data_info.type_name) other_info = get_person_other_info_by_person_id_and_type_parent_id(db, contact.id, type_parent_id) if other_info: type_data['dept_name'] = other_info.dept_name type_data['denger_point_name'] = other_info.denger_point_name type_data['other_type_2_name'] = other_info.other_type_2_name other_type_list = get_person_other_type_by_person_id_and_type_parent_id(db, contact.id, type_parent_id) if other_type_list: type_data['children2'] = [] type_data['other_type_label'] = [] for other_type in other_type_list: other_type_info = get_other_type_info_by_id(db, other_type.other_type_id) label= '' if other_type_info: label = other_type_info.type_name # type_data['children2'].append({"id": other_type.other_type_id, "label": label}) type_data['children2'].append(other_type.other_type_id) type_data['other_type_label'].append(label) contact_result['type_list'].append(type_data) # 返回结果 return { "code": 200, "msg": "成功", "data": contact_result } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.delete('/delete') async def delete_emergency_plans( ids: list, db: Session = Depends(get_db), body=Depends(remove_xss_json), user_id=Depends(valid_access_token) ): try: # 提取请求数据 query = db.query(ThreeProofingResponsiblePerson) query = query.filter(ThreeProofingResponsiblePerson.del_flag != '2') query = query.filter(ThreeProofingResponsiblePerson.id.in_(ids)) contacts = query.all() if not contacts: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': '联系人不存在' }) for contact in contacts: contact.del_flag = '2' contact.create_by = user_id # 更新到数据库会话并提交 db.commit() # 返回创建成功的响应 return { "code": 200, "msg": "删除成功", "data": None } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.delete('/delete/{id}') async def delete_emergency_plans( id: int, db: Session = Depends(get_db), body=Depends(remove_xss_json), user_id=Depends(valid_access_token) ): try: contact = get_person_info_by_id(db, id) if not contact: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': '联系人不存在' }) contact.del_flag = '2' contact.create_by = user_id # 更新到数据库会话并提交 db.commit() # 返回创建成功的响应 return { "code": 200, "msg": "删除成功", "data": None } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) def string_type_parent_id_create_data(db,string,type_parent_id,file_info,new_person,user_id,row) : type_name_list = [i for i in string.split(',')] reslte = [] for type_name in type_name_list: type_id = get_type_id_by_type_parent_id_and_type_name(db, '1', type_name) if type_id: new_person_type = ThreeProofingResponsiblePersonType( type_parent_id=type_parent_id, type_id=type_id, person_id=new_person.id, create_by=user_id ) reslte.append(new_person_type) else: file_info.remark= file_info.remark+f'\n行<{row+1}>责任类别未找到<{type_name}>' return reslte ,False return reslte ,True def import_data(db,file_path,user_id,file_info): book = xlrd.open_workbook(file_path) sheet = book.sheet_by_index(0) data = [] import_status = True for row in range(4, sheet.nrows): # 姓名 name = sheet.cell(row, 0).value if name == '': file_info.remark = file_info.remark+f'\n行<{row+1}>姓名不能为空<{name}>' import_status = False continue # 所属单位 unit_name = sheet.cell(row, 1).value if unit_name == '': file_info.remark = file_info.remark+f'\n行<{row+1}>所属单位不能为空<{unit_name}>' import_status = False continue unit_id = db_dept.get_dept_id_by_name(db, unit_name) # 职务 position = sheet.cell(row, 2).value if position =='': file_info.remark = file_info.remark+f'\n行<{row+1}>职务不能为空<{position}>' import_status = False continue # 电话号码(如有多个手机号请用“,”分隔) phone = int(sheet.cell(row, 3).value) if phone =='': file_info.remark = file_info.remark+f'\n行<{row+1}>电话号码不能为空<{phone}>' import_status = False continue phone_list = [i for i in phone.split(',')] user_id_1=-1 for i in phone_list: user_id_1 = db_user.get_user_id_by_phonenumber(db,i) if user_id_1 != -1: break # 办公电话 # (选填,格式:区号-电话号码) telephone = sheet.cell(row, 4).value # 排位顺序 # (选填,请输入排序号1-9999,排序号数值越小越靠前,不填则默认排至最末) order_num = sheet.cell(row, 5).value if order_num == '': order_num=-1 area_name = sheet.cell(row, 7).value if area_name=='': area_code=2 else: area_code=get_area_info_by_area_name(db,area_name) if area_code is None: file_info.remark = file_info.remark+f'\n行<{row+1}>责任区域未找到<{area_name}>' import_status = False continue new_person = ThreeProofingResponsiblePerson( unit_id=unit_id, unit_name=unit_name, name=name, area_code=area_code, position=position, phone=phone, telephone=telephone, user_id=user_id_1, order_num=order_num, create_by=user_id ) data.append(new_person) db.add(new_person) db.commit() # 党委政府 a1 = sheet.cell(row, 6).value if a1 != '': new_type_list,status = string_type_parent_id_create_data(db,a1,'1',file_info,new_person,user_id) import_status = status if status: db.add_all(new_type_list) data +=new_type_list # type_name_list = [i for i in a1.split(',')] # for type_name in type_name_list: # type_id = get_type_id_by_type_parent_id_and_type_name(db, '1', type_name) # if type_id: # pass # 三防指挥部 b1 = sheet.cell(row, 8).value if b1 != '': new_type_list,status = string_type_parent_id_create_data(db,b1,'2',file_info,new_person,user_id) import_status = status if status: db.add_all(new_type_list) data +=new_type_list b2 = sheet.cell(row, 9).value # 应急部门 c1 = sheet.cell(row, 10).value if c1!='': new_type_list, status = string_type_parent_id_create_data(db, c1, '3', file_info, new_person, user_id) import_status = status if status: db.add_all(new_type_list) data += new_type_list # 成员单位 d1 = sheet.cell(row, 11).value if d1!='': new_type_list, status = string_type_parent_id_create_data(db, d1, '4', file_info, new_person, user_id) import_status = status if status: db.add_all(new_type_list) data += new_type_list d2 = sheet.cell(row, 12).value if d2!='': pass # 重点部门 e1 = sheet.cell(row, 13).value if e1!='': new_type_list, status = string_type_parent_id_create_data(db, e1, '5', file_info, new_person, user_id) import_status = status if status: db.add_all(new_type_list) data += new_type_list e2 = sheet.cell(row, 14).value e3 = sheet.cell(row, 15).value # 行政村 f1 = sheet.cell(row, 16).value if f1!='': new_type_list, status = string_type_parent_id_create_data(db, f1, '6', file_info, new_person, user_id) import_status = status if status: db.add_all(new_type_list) data += new_type_list # 水利工程 g1 = sheet.cell(row, 10).value if g1!='': new_type_list, status = string_type_parent_id_create_data(db, g1, '7', file_info, new_person, user_id) import_status = status if status: db.add_all(new_type_list) data += new_type_list g2 = sheet.cell(row, 11).value g3 = sheet.cell(row, 12).value # 受威胁转移 h1 = sheet.cell(row, 13).value if h1!='': new_type_list, status = string_type_parent_id_create_data(db, h1, '8', file_info, new_person, user_id) import_status = status if status: db.add_all(new_type_list) data += new_type_list # 抢险队伍 j1 = sheet.cell(row, 14).value if j1!='': new_type_list, status = string_type_parent_id_create_data(db, j1, '9', file_info, new_person, user_id) import_status = status if status: db.add_all(new_type_list) data += new_type_list j2 = sheet.cell(row, 15).value # 地质灾害 k1 = sheet.cell(row, 16).value if k1!='': new_type_list, status = string_type_parent_id_create_data(db, k1, '10', file_info, new_person, user_id) import_status = status if status: db.add_all(new_type_list) data += new_type_list k2 = sheet.cell(row, 17).value # 其他 l1 = sheet.cell(row, 18).value if l1!='': new_type_list, status = string_type_parent_id_create_data(db, l1, '11', file_info, new_person, user_id) import_status = status if status: db.add_all(new_type_list) data += new_type_list l2 = sheet.cell(row, 19).value if import_status == False: for info in data: db.delete(info) db.commit() @router.post('/createImport') async def create_contact( background_tasks: BackgroundTasks, db: Session = Depends(get_db), body=Depends(remove_xss_json), user_id=Depends(valid_access_token), ): try: # 提取请求数据 filename = body['filename'] file_name_desc = body['file_name_desc'] if len(filename) == 0: raise Exception() file_name = filename[0]['url'] file_path = f'/data/upload/mergefile/uploads/{file_name}' # 检查文件是否存在 if not os.path.isfile(file_path): return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': f'{file_name}不存在' }) new_file = ThreeProofingResponsiblePersonImportFileStatus( file_uuid=filename, file_name = file_name_desc, status = '1', user_id=user_id ) db.add(new_file) db.commit() background_tasks.add_task(import_data,db,file_path, user_id,new_file) # 返回创建成功的响应 return { "code": 200, "msg": "成功", "data": None } except AppException as e: return { "code": 500, "msg": e.msg } except Exception as e: traceback.print_exc() # 处理异常 db.rollback() raise HTTPException(status_code=500, detail=str(e))