from fastapi import APIRouter, Request, Depends, HTTPException, Query from sqlalchemy.exc import IntegrityError from fastapi.responses import HTMLResponse, FileResponse from fastapi.responses import JSONResponse from database import get_db from sqlalchemy import text, exists, and_, or_, not_ from sqlalchemy.orm import Session from models import * import json import random import os from sqlalchemy import create_engine, select from typing import Optional from utils.StripTagsHTMLParser import * from utils.three_proofing_responsible_util import * from utils.ry_system_util import * from common.db import db_event_management, db_user, db_area, db_emergency_plan from common.security import valid_access_token import traceback from utils import * from datetime import datetime, timedelta import pandas as pd from common.db import db_dept from exceptions import AppException router = APIRouter() @router.post('/create') async def create_contact( db: Session = Depends(get_db), body=Depends(remove_xss_json), user_id=Depends(valid_access_token) ): try: # 提取请求数据 unit_name = body['unit_name'] name = body['name'] area_code = body['area_code'] position = body['position'] phone = body['phone'] telephone= '' if 'telephone' in body: telephone=body['telephone'] order_num = -1 if 'order_num'in body: order_num=body['order_num'] unit_id = db_dept.get_dept_id_by_name(db, unit_name) user_id_1 = db_user.get_user_id_by_phonenumber(db,phone) # 创建新的预案记录 new_person = ThreeProofingResponsiblePerson( unit_id=unit_id, unit_name=unit_name, name=name, area_code=area_code, position=position, phone=phone, telephone=telephone, user_id = user_id_1, order_num=order_num, create_by=user_id ) # 添加到数据库会话并提交 type_list = body['type_list'] if isinstance(type_list,list) and len(type_list)>0: db.add(new_person) db.commit() else: return JSONResponse(status_code=404,content={ 'code': 404, 'msg': '责任类型不能为空' }) try: for type_info in type_list: type_parent_id = type_info['type_parent_id']#get_type_parent_id_by_type_id(db,type_id) for type_id in type_info['children']: new_person_type = ThreeProofingResponsiblePersonType( type_parent_id = type_parent_id, type_id = type_id, person_id = new_person.id, create_by=user_id ) db.add(new_person_type) if type_parent_id in ('5','7','9'): if 'children2' in type_info: for other_type_id in type_info['children2']: new_person_other_type = ThreeProofingResponsiblePersonOtherType( type_parent_id=type_parent_id, other_type_id=other_type_id, person_id=new_person.id, create_by=user_id ) db.add(new_person_other_type) if type_parent_id in ('4','5','7','10','11'): dept_name = None if 'dept_name' in type_info: dept_name = type_info['dept_name'] other_type_2_name = None if 'other_type_2_name' in type_info: other_type_2_name = type_info['other_type_2_name'] denger_point_name = None if 'denger_point_name' in type_info: denger_point_name = type_info['denger_point_name'] new_person_other_info = ThreeProofingResponsiblePersonOtherInfo( type_parent_id = type_parent_id, dept_name = dept_name, other_type_2_name = other_type_2_name, denger_point_name = denger_point_name, person_id = new_person.id, create_by=user_id ) db.add(new_person_other_info) except: db.rollback() traceback.print_exc() new_person.del_flag='2' db.commit() # 返回创建成功的响应 return { "code": 200, "msg": "创建成功", "data": None } except Exception as e: # 处理异常 db.rollback() traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.put('/update') async def update_contact( db: Session = Depends(get_db), body=Depends(remove_xss_json), user_id=Depends(valid_access_token) ): try: # 提取请求数据 id = body['id'] person = get_person_info_by_id(db,id) if not person: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': '责任人不存在' }) person.unit_name = body['unit_name'] person.name = body['name'] person.area_code = body['area_code'] person.position = body['position'] person.phone = body['phone'] if 'telephone' in body: person.telephone=body['telephone'] person.order_num = body['order_num'] person.unit_id = db_dept.get_dept_id_by_name(db, body['unit_name']) type_list = body['type_list'] old_person_type_list=get_person_type_by_person_id(db,person.id) for old_person_type in old_person_type_list: old_person_type.del_flag='2' old_person_other_type_list = get_person_other_info_by_person_id(db,person.id) for old_person_other_type in old_person_other_type_list: old_person_other_type.del_flag = '2' for type_info in type_list: type_parent_id = type_info['type_parent_id'] # get_type_parent_id_by_type_id(db,type_id) for type_id in type_info['children']: new_person_type = ThreeProofingResponsiblePersonType( type_parent_id=type_parent_id, type_id=type_id, person_id=id, create_by=user_id ) db.add(new_person_type) if type_parent_id in ('5', '7', '9'): if 'children2' in type_info: for other_type_id in type_info['children2']: new_person_other_type = ThreeProofingResponsiblePersonOtherType( type_parent_id=type_parent_id, other_type_id=other_type_id, person_id=person.id, create_by=user_id ) db.add(new_person_other_type) if type_parent_id in ('4', '5', '7', '10', '11'): dept_name = None if 'dept_name' in type_info: dept_name = type_info['dept_name'] other_type_2_name = None if 'other_type_2_name' in type_info: other_type_2_name = type_info['other_type_2_name'] denger_point_name = None if 'denger_point_name' in type_info: denger_point_name = type_info['denger_point_name'] new_person_other_info = ThreeProofingResponsiblePersonOtherInfo( type_parent_id=type_parent_id, dept_name=dept_name, other_type_2_name=other_type_2_name, denger_point_name=denger_point_name, person_id=person.id, create_by=user_id ) db.add(new_person_other_info) # 更新到数据库会话并提交 db.commit() # 返回创建成功的响应 return { "code": 200, "msg": "更新成功", "data": None } except Exception as e: # 处理异常 db.rollback() traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.get('/list') async def get_emergency_contact_list( type_parent_id: str = Query(None, description='单位名称'), area_code:str = Query(None, description='单位名称'), Name: str = Query(None, description='联系人'), page: int = Query(1, gt=0, description='页码'), pageSize: int = Query(10, gt=0, description='每页条目数量'), db: Session = Depends(get_db), user_id=Depends(valid_access_token) ): try: # 构建查询 query = db.query(ThreeProofingResponsiblePerson) query = query.filter(ThreeProofingResponsiblePerson.del_flag == '0') # 应用查询条件 if type_parent_id: person_list = get_person_list_by_type_parent_id(db,type_parent_id) query = query.filter(ThreeProofingResponsiblePerson.id.in_(person_list)) if Name: query = query.filter(ThreeProofingResponsiblePerson.name.like(f'%{Name}%')) if area_code: query = query.filter(ThreeProofingResponsiblePerson.area_code==area_code) # 计算总条目数 total_items = query.count() # 排序 query = query.order_by(ThreeProofingResponsiblePerson.order_num.asc()) query = query.order_by(ThreeProofingResponsiblePerson.create_time.desc()) # 执行分页查询 contact_infos = query.offset((page - 1) * pageSize).limit(pageSize).all() # 将查询结果转换为列表形式的字典 contact_infos_list = [] for info in contact_infos: type_parent_id_list = get_type_parent_id_by_person_id(db,info.id) type_parent_list = [] type_parent_list2 = [] for type_parent in type_parent_id_list: if type_parent not in type_parent_list2: dict_data = get_dict_data_info(db,'three_proofing',type_parent) type_parent_list2.append(type_parent) type_parent_list.append({"type_parent_id":type_parent,"type_parent":dict_data.dict_label}) area_info = id_get_area_info(db,info.area_code) user_info = user_id_get_user_info(db,info.create_by) area_list = db_area.id_get_area_parent_list(db,info.area_code,[]) contact_infos_list.append({ "id": info.id, "unit_id": info.unit_id, "unit_name": info.unit_name, "name": info.name, "area_list":area_list, "area_code": info.area_code, "area_name": area_info.area_name, "position": info.position, "phone": info.phone, "telephone":info.telephone, "order_num":info.order_num, "online_status":'0', "create_time": info.create_time.strftime('%Y-%m-%d %H:%M:%S'), "create_by":info.create_by, "create_user":user_info.nick_name, "type_parent_list":type_parent_list }) # 返回结果+ return { "code": 200, "msg": "成功", "data": contact_infos_list, "total": total_items } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.get('/info/{id}') async def get_emergency_contact_id_info( id: str, db: Session = Depends(get_db), user_id=Depends(valid_access_token) ): try: contact = get_person_info_by_id(db,id) if not contact: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': '联系人不存在' }) # 将查询结果转换为列表形式的字典 area_info = id_get_area_info(db,contact.area_code) user_info = user_id_get_user_info(db,contact.create_by) area_list = db_area.id_get_area_parent_list(db, contact.area_code, []) contact_result = { "id": contact.id, "unit_id": contact.unit_id, "unit_name": contact.unit_name, "name": contact.name, "area_list":area_list, "area_code":contact.area_code, "area_name": area_info.area_name, "position": contact.position, "phone": contact.phone, "telephone":contact.telephone, "order_num":contact.order_num, "online_status":'0', "create_time": contact.create_time.strftime('%Y-%m-%d %H:%M:%S'), "create_by":contact.create_by, "create_user":user_info.nick_name, "type_list":[] } type_parent_id_list = [] type_list = get_person_type_by_person_id(db,contact.id) for type_info in type_list: if type_info.type_parent_id not in type_parent_id_list: type_parent_id_list.append(type_info.type_parent_id) for type_parent_id in type_parent_id_list: dict_data = get_dict_data_info(db, 'three_proofing', type_parent_id) type_data = {"type_parent_id": type_parent_id, "type_parent": dict_data.dict_label,"children":[]} for type_info in get_person_type_by_person_id_and_type_parent_id(db,contact.id,type_parent_id): type_data_info = get_type_info_by_id(db,type_info.type_id) type_data['children'].append({"id": type_info.type_id, "label": type_data_info.type_name}) other_info = get_person_other_info_by_person_id_and_type_parent_id(db, contact.id, type_parent_id) if other_info: type_data['dept_name'] = other_info.dept_name type_data['denger_point_name'] = other_info.denger_point_name type_data['other_type_2_name'] = other_info.other_type_2_name other_type_list = get_person_other_type_by_person_id_and_type_parent_id(db, contact.id, type_parent_id) if other_type_list: type_data['children2'] = [] for other_type in other_type_list: other_type_info = get_other_type_info_by_id(db, other_type.other_type_id) label= '' if other_type_info: label = other_type_info.type_name type_data['children2'].append({"id": other_type.other_type_id, "label": label}) contact_result['type_list'].append(type_data) # 返回结果 return { "code": 200, "msg": "成功", "data": contact_result } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.delete('/delete') async def delete_emergency_plans( ids: list, db: Session = Depends(get_db), body=Depends(remove_xss_json), user_id=Depends(valid_access_token) ): try: # 提取请求数据 query = db.query(ThreeProofingResponsiblePerson) query = query.filter(ThreeProofingResponsiblePerson.del_flag != '2') query = query.filter(ThreeProofingResponsiblePerson.id.in_(ids)) contacts = query.all() if not contacts: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': '联系人不存在' }) for contact in contacts: contact.del_flag = '2' contact.create_by = user_id # 更新到数据库会话并提交 db.commit() # 返回创建成功的响应 return { "code": 200, "msg": "删除成功", "data": None } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.delete('/delete/{id}') async def delete_emergency_plans( id: int, db: Session = Depends(get_db), body=Depends(remove_xss_json), user_id=Depends(valid_access_token) ): try: contact = get_person_info_by_id(db, id) if not contact: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': '联系人不存在' }) contact.del_flag = '2' contact.create_by = user_id # 更新到数据库会话并提交 db.commit() # 返回创建成功的响应 return { "code": 200, "msg": "删除成功", "data": None } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) # # @router.post('/createImport') # async def create_contact( # db: Session = Depends(get_db), # body=Depends(remove_xss_json), # user_id=Depends(valid_access_token) # ): # try: # # 提取请求数据 # filename = body['filename'] # if len(filename) == 0: # raise Exception() # # file_name = filename[0]['url'] # file_path = f'/data/upload/mergefile/uploads/{file_name}' # # # 检查文件是否存在 # if not os.path.isfile(file_path): # return JSONResponse(status_code=404, content={ # 'errcode': 404, # 'errmsg': f'{file_name}不存在' # }) # # # 定义预期的列名和对应的最大长度 # expected_columns = { # '单位名称': 255, # '联系人': 255, # '职务': 255, # '粤政易手机号码': 20 # } # try: # df = pd.read_excel(file_path, engine='openpyxl', header=0) # except Exception as e: # raise AppException(500, "请按模板上传!") # # # 读取Excel文件 # # try: # # df = pd.read_excel(file_path, engine='openpyxl', header=0) # # except Exception as e: # # return f"读取Excel文件时出错: {e}" # # # 获取前两行的数据 # first_two_rows = df # # print(first_two_rows) # # 检查列名是否匹配 # actual_columns = first_two_rows.columns.tolist() # # print('actual_columns', actual_columns) # missing_columns = [col for col in expected_columns.keys() if col not in actual_columns] # # if len(missing_columns) > 0: # raise AppException(500, "请按模板上传") # # head1 = df.head(1) # head1data = head1.to_dict(orient='records')[0] # print(head1data) # if head1data['单位名称'] == '填写单位名称' and head1data['联系人'] == '填写单位联系人' and head1data['职务'] == '填写联系人职务' and \ # head1data['粤政易手机号码'] == '填写联系人的粤政易注册手机号码': # df = df.drop(index=0) # else: # raise AppException(500, "请按模板上传。") # # # 检查前两行的字段长度 # for index, row in first_two_rows.iterrows(): # for col, max_length in expected_columns.items(): # # if pd.isna(row[col]): # # return f"第{index + 1}行的'{col}'字段不能为空。" # if pd.notna(row[col]) and len(str(row[col])) > max_length: # raise AppException(500, f"第{index + 1}行的'{col}'字段长度超过{max_length}字符。") # # data = df.to_dict(orient='records') # # print(data) # infos = [] # for info in data: # if pd.isna(info['单位名称']) and pd.isna(info['联系人']) and pd.isna(info['粤政易手机号码']): # continue # if pd.isna(info['单位名称']) or pd.isna(info['联系人']) or pd.isna(info['粤政易手机号码']): # return "单位名称、联系人、粤政易手机号码为必填" # if pd.isna(info['职务']): # info['职务'] = None # infos.append(info) # # # 创建新的预案记录 # for contact in infos: # unit_id = db_dept.get_dept_id_by_name(db, contact['单位名称']) # if unit_id == '': # raise AppException(500, "单位名称不正确") # # # 删除之前同一个部门的人员 # db.query(EmergencyContactInfo).filter( # and_(EmergencyContactInfo.del_flag == "0", EmergencyContactInfo.unit_id == unit_id, )).update( # {"del_flag": "2"}) # # new_contact = EmergencyContactInfo( # unit_id=unit_id, # unit_name=contact['单位名称'], # contact_name=contact['联系人'], # position=contact['职务'], # yue_gov_ease_phone=contact['粤政易手机号码'], # create_by=user_id # ) # # # 添加到数据库会话 # db.add(new_contact) # # 提交 # db.commit() # # # 返回创建成功的响应 # return { # "code": 200, # "msg": "创建成功", # "data": None # } # # except AppException as e: # return { # "code": 500, # "msg": e.msg # } # # except Exception as e: # traceback.print_exc() # # 处理异常 # raise HTTPException(status_code=500, detail=str(e))