123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567 |
- from fastapi import APIRouter, Request, Depends, HTTPException, Query
- from sqlalchemy.exc import IntegrityError
- from fastapi.responses import HTMLResponse, FileResponse
- from fastapi.responses import JSONResponse
- from database import get_db
- from sqlalchemy import text, exists, and_, or_, not_
- from sqlalchemy.orm import Session
- from models import *
- import json
- import random
- import os
- from sqlalchemy import create_engine, select
- from typing import Optional
- from utils.StripTagsHTMLParser import *
- from utils.three_proofing_responsible_util import *
- from utils.ry_system_util import *
- from common.db import db_event_management, db_user, db_area, db_emergency_plan
- from common.security import valid_access_token
- import traceback
- from utils import *
- from datetime import datetime, timedelta
- import pandas as pd
- from common.db import db_dept
- from exceptions import AppException
- router = APIRouter()
- @router.post('/create')
- async def create_contact(
- db: Session = Depends(get_db),
- body=Depends(remove_xss_json),
- user_id=Depends(valid_access_token)
- ):
- try:
- # 提取请求数据
- unit_name = body['unit_name']
- name = body['name']
- area_code = body['area_code']
- position = body['position']
- phone = body['phone']
- telephone= ''
- if 'telephone' in body:
- telephone=body['telephone']
- order_num = -1
- if 'order_num'in body:
- order_num=body['order_num']
- unit_id = db_dept.get_dept_id_by_name(db, unit_name)
- user_id_1 = db_user.get_user_id_by_phonenumber(db,phone)
- # 创建新的预案记录
- new_person = ThreeProofingResponsiblePerson(
- unit_id=unit_id,
- unit_name=unit_name,
- name=name,
- area_code=area_code,
- position=position,
- phone=phone,
- telephone=telephone,
- user_id = user_id_1,
- order_num=order_num,
- create_by=user_id
- )
- # 添加到数据库会话并提交
- type_list = body['type_list']
- if isinstance(type_list,list) and len(type_list)>0:
- db.add(new_person)
- db.commit()
- else:
- return JSONResponse(status_code=404,content={
- 'code': 404,
- 'msg': '责任类型不能为空'
- })
- try:
- for type_info in type_list:
- type_parent_id = type_info['type_parent_id']#get_type_parent_id_by_type_id(db,type_id)
- for type_id in type_info['children']:
- new_person_type = ThreeProofingResponsiblePersonType(
- type_parent_id = type_parent_id,
- type_id = type_id,
- person_id = new_person.id,
- create_by=user_id
- )
- db.add(new_person_type)
- if type_parent_id in ('5','7','9'):
- if 'children2' in type_info:
- for other_type_id in type_info['children2']:
- new_person_other_type = ThreeProofingResponsiblePersonOtherType(
- type_parent_id=type_parent_id,
- other_type_id=other_type_id,
- person_id=new_person.id,
- create_by=user_id
- )
- db.add(new_person_other_type)
- if type_parent_id in ('4','5','7','10','11'):
- dept_name = None
- if 'dept_name' in type_info:
- dept_name = type_info['dept_name']
- other_type_2_name = None
- if 'other_type_2_name' in type_info:
- other_type_2_name = type_info['other_type_2_name']
- denger_point_name = None
- if 'denger_point_name' in type_info:
- denger_point_name = type_info['denger_point_name']
- new_person_other_info = ThreeProofingResponsiblePersonOtherInfo(
- type_parent_id = type_parent_id,
- dept_name = dept_name,
- other_type_2_name = other_type_2_name,
- denger_point_name = denger_point_name,
- person_id = new_person.id,
- create_by=user_id
- )
- db.add(new_person_other_info)
- except:
- db.rollback()
- traceback.print_exc()
- new_person.del_flag='2'
- db.commit()
- # 返回创建成功的响应
- return {
- "code": 200,
- "msg": "创建成功",
- "data": None
- }
- except Exception as e:
- # 处理异常
- db.rollback()
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=str(e))
- @router.put('/update')
- async def update_contact(
- db: Session = Depends(get_db),
- body=Depends(remove_xss_json),
- user_id=Depends(valid_access_token)
- ):
- try:
- # 提取请求数据
- id = body['id']
- person = get_person_info_by_id(db,id)
- if not person:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': '责任人不存在'
- })
- person.unit_name = body['unit_name']
- person.name = body['name']
- person.area_code = body['area_code']
- person.position = body['position']
- person.phone = body['phone']
- if 'telephone' in body:
- person.telephone=body['telephone']
- person.order_num = body['order_num']
- person.unit_id = db_dept.get_dept_id_by_name(db, body['unit_name'])
- type_list = body['type_list']
- old_person_type_list=get_person_type_by_person_id(db,person.id)
- for old_person_type in old_person_type_list:
- old_person_type.del_flag='2'
- old_person_other_type_list = get_person_other_info_by_person_id(db,person.id)
- for old_person_other_type in old_person_other_type_list:
- old_person_other_type.del_flag = '2'
- for type_info in type_list:
- type_parent_id = type_info['type_parent_id'] # get_type_parent_id_by_type_id(db,type_id)
- for type_id in type_info['children']:
- new_person_type = ThreeProofingResponsiblePersonType(
- type_parent_id=type_parent_id,
- type_id=type_id,
- person_id=id,
- create_by=user_id
- )
- db.add(new_person_type)
- if type_parent_id in ('5', '7', '9'):
- if 'children2' in type_info:
- for other_type_id in type_info['children2']:
- new_person_other_type = ThreeProofingResponsiblePersonOtherType(
- type_parent_id=type_parent_id,
- other_type_id=other_type_id,
- person_id=person.id,
- create_by=user_id
- )
- db.add(new_person_other_type)
- if type_parent_id in ('4', '5', '7', '10', '11'):
- dept_name = None
- if 'dept_name' in type_info:
- dept_name = type_info['dept_name']
- other_type_2_name = None
- if 'other_type_2_name' in type_info:
- other_type_2_name = type_info['other_type_2_name']
- denger_point_name = None
- if 'denger_point_name' in type_info:
- denger_point_name = type_info['denger_point_name']
- new_person_other_info = ThreeProofingResponsiblePersonOtherInfo(
- type_parent_id=type_parent_id,
- dept_name=dept_name,
- other_type_2_name=other_type_2_name,
- denger_point_name=denger_point_name,
- person_id=person.id,
- create_by=user_id
- )
- db.add(new_person_other_info)
- # 更新到数据库会话并提交
- db.commit()
- # 返回创建成功的响应
- return {
- "code": 200,
- "msg": "更新成功",
- "data": None
- }
- except Exception as e:
- # 处理异常
- db.rollback()
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=str(e))
- @router.get('/list')
- async def get_emergency_contact_list(
- type_parent_id: str = Query(None, description='单位名称'),
- area_code:str = Query(None, description='单位名称'),
- Name: str = Query(None, description='联系人'),
- page: int = Query(1, gt=0, description='页码'),
- pageSize: int = Query(10, gt=0, description='每页条目数量'),
- db: Session = Depends(get_db),
- user_id=Depends(valid_access_token)
- ):
- try:
- # 构建查询
- query = db.query(ThreeProofingResponsiblePerson)
- query = query.filter(ThreeProofingResponsiblePerson.del_flag == '0')
- # 应用查询条件
- if type_parent_id:
- person_list = get_person_list_by_type_parent_id(db,type_parent_id)
- query = query.filter(ThreeProofingResponsiblePerson.id.in_(person_list))
- if Name:
- query = query.filter(ThreeProofingResponsiblePerson.name.like(f'%{Name}%'))
- if area_code:
- query = query.filter(ThreeProofingResponsiblePerson.area_code==area_code)
- # 计算总条目数
- total_items = query.count()
- # 排序
- query = query.order_by(ThreeProofingResponsiblePerson.order_num.asc())
- query = query.order_by(ThreeProofingResponsiblePerson.create_time.desc())
- # 执行分页查询
- contact_infos = query.offset((page - 1) * pageSize).limit(pageSize).all()
- # 将查询结果转换为列表形式的字典
- contact_infos_list = []
- for info in contact_infos:
- type_parent_id_list = get_type_parent_id_by_person_id(db,info.id)
- type_parent_list = []
- type_parent_list2 = []
- for type_parent in type_parent_id_list:
- if type_parent not in type_parent_list2:
- dict_data = get_dict_data_info(db,'three_proofing',type_parent)
- type_parent_list2.append(type_parent)
- type_parent_list.append({"type_parent_id":type_parent,"type_parent":dict_data.dict_label})
- area_info = id_get_area_info(db,info.area_code)
- user_info = user_id_get_user_info(db,info.create_by)
- area_list = db_area.id_get_area_parent_list(db,info.area_code,[])
- contact_infos_list.append({
- "id": info.id,
- "unit_id": info.unit_id,
- "unit_name": info.unit_name,
- "name": info.name,
- "area_list":area_list,
- "area_code": info.area_code,
- "area_name": area_info.area_name,
- "position": info.position,
- "phone": info.phone,
- "telephone":info.telephone,
- "order_num":info.order_num,
- "online_status":'0',
- "create_time": info.create_time.strftime('%Y-%m-%d %H:%M:%S'),
- "create_by":info.create_by,
- "create_user":user_info.nick_name,
- "type_parent_list":type_parent_list
- })
- # 返回结果+
- return {
- "code": 200,
- "msg": "成功",
- "data": contact_infos_list,
- "total": total_items
- }
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=str(e))
- @router.get('/info/{id}')
- async def get_emergency_contact_id_info(
- id: str,
- db: Session = Depends(get_db),
- user_id=Depends(valid_access_token)
- ):
- try:
- contact = get_person_info_by_id(db,id)
- if not contact:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': '联系人不存在'
- })
- # 将查询结果转换为列表形式的字典
- area_info = id_get_area_info(db,contact.area_code)
- user_info = user_id_get_user_info(db,contact.create_by)
- area_list = db_area.id_get_area_parent_list(db, contact.area_code, [])
- contact_result = {
- "id": contact.id,
- "unit_id": contact.unit_id,
- "unit_name": contact.unit_name,
- "name": contact.name,
- "area_list":area_list,
- "area_code":contact.area_code,
- "area_name": area_info.area_name,
- "position": contact.position,
- "phone": contact.phone,
- "telephone":contact.telephone,
- "order_num":contact.order_num,
- "online_status":'0',
- "create_time": contact.create_time.strftime('%Y-%m-%d %H:%M:%S'),
- "create_by":contact.create_by,
- "create_user":user_info.nick_name,
- "type_list":[]
- }
- type_parent_id_list = []
- type_list = get_person_type_by_person_id(db,contact.id)
- for type_info in type_list:
- if type_info.type_parent_id not in type_parent_id_list:
- type_parent_id_list.append(type_info.type_parent_id)
- for type_parent_id in type_parent_id_list:
- dict_data = get_dict_data_info(db, 'three_proofing', type_parent_id)
- type_data = {"type_parent_id": type_parent_id, "type_parent": dict_data.dict_label,"children":[]}
- for type_info in get_person_type_by_person_id_and_type_parent_id(db,contact.id,type_parent_id):
- type_data_info = get_type_info_by_id(db,type_info.type_id)
- type_data['children'].append({"id": type_info.type_id, "label": type_data_info.type_name})
- other_info = get_person_other_info_by_person_id_and_type_parent_id(db, contact.id, type_parent_id)
- if other_info:
- type_data['dept_name'] = other_info.dept_name
- type_data['denger_point_name'] = other_info.denger_point_name
- type_data['other_type_2_name'] = other_info.other_type_2_name
- other_type_list = get_person_other_type_by_person_id_and_type_parent_id(db, contact.id, type_parent_id)
- if other_type_list:
- type_data['children2'] = []
- for other_type in other_type_list:
- other_type_info = get_other_type_info_by_id(db, other_type.other_type_id)
- label= ''
- if other_type_info:
- label = other_type_info.type_name
- type_data['children2'].append({"id": other_type.other_type_id, "label": label})
- contact_result['type_list'].append(type_data)
- # 返回结果
- return {
- "code": 200,
- "msg": "成功",
- "data": contact_result
- }
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=str(e))
- @router.delete('/delete')
- async def delete_emergency_plans(
- ids: list,
- db: Session = Depends(get_db),
- body=Depends(remove_xss_json),
- user_id=Depends(valid_access_token)
- ):
- try:
- # 提取请求数据
- query = db.query(ThreeProofingResponsiblePerson)
- query = query.filter(ThreeProofingResponsiblePerson.del_flag != '2')
- query = query.filter(ThreeProofingResponsiblePerson.id.in_(ids))
- contacts = query.all()
- if not contacts:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': '联系人不存在'
- })
- for contact in contacts:
- contact.del_flag = '2'
- contact.create_by = user_id
- # 更新到数据库会话并提交
- db.commit()
- # 返回创建成功的响应
- return {
- "code": 200,
- "msg": "删除成功",
- "data": None
- }
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=str(e))
- @router.delete('/delete/{id}')
- async def delete_emergency_plans(
- id: int,
- db: Session = Depends(get_db),
- body=Depends(remove_xss_json),
- user_id=Depends(valid_access_token)
- ):
- try:
- contact = get_person_info_by_id(db, id)
- if not contact:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': '联系人不存在'
- })
- contact.del_flag = '2'
- contact.create_by = user_id
- # 更新到数据库会话并提交
- db.commit()
- # 返回创建成功的响应
- return {
- "code": 200,
- "msg": "删除成功",
- "data": None
- }
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=str(e))
- #
- # @router.post('/createImport')
- # async def create_contact(
- # db: Session = Depends(get_db),
- # body=Depends(remove_xss_json),
- # user_id=Depends(valid_access_token)
- # ):
- # try:
- # # 提取请求数据
- # filename = body['filename']
- # if len(filename) == 0:
- # raise Exception()
- #
- # file_name = filename[0]['url']
- # file_path = f'/data/upload/mergefile/uploads/{file_name}'
- #
- # # 检查文件是否存在
- # if not os.path.isfile(file_path):
- # return JSONResponse(status_code=404, content={
- # 'errcode': 404,
- # 'errmsg': f'{file_name}不存在'
- # })
- #
- # # 定义预期的列名和对应的最大长度
- # expected_columns = {
- # '单位名称': 255,
- # '联系人': 255,
- # '职务': 255,
- # '粤政易手机号码': 20
- # }
- # try:
- # df = pd.read_excel(file_path, engine='openpyxl', header=0)
- # except Exception as e:
- # raise AppException(500, "请按模板上传!")
- #
- # # 读取Excel文件
- # # try:
- # # df = pd.read_excel(file_path, engine='openpyxl', header=0)
- # # except Exception as e:
- # # return f"读取Excel文件时出错: {e}"
- #
- # # 获取前两行的数据
- # first_two_rows = df
- # # print(first_two_rows)
- # # 检查列名是否匹配
- # actual_columns = first_two_rows.columns.tolist()
- # # print('actual_columns', actual_columns)
- # missing_columns = [col for col in expected_columns.keys() if col not in actual_columns]
- #
- # if len(missing_columns) > 0:
- # raise AppException(500, "请按模板上传")
- #
- # head1 = df.head(1)
- # head1data = head1.to_dict(orient='records')[0]
- # print(head1data)
- # if head1data['单位名称'] == '填写单位名称' and head1data['联系人'] == '填写单位联系人' and head1data['职务'] == '填写联系人职务' and \
- # head1data['粤政易手机号码'] == '填写联系人的粤政易注册手机号码':
- # df = df.drop(index=0)
- # else:
- # raise AppException(500, "请按模板上传。")
- #
- # # 检查前两行的字段长度
- # for index, row in first_two_rows.iterrows():
- # for col, max_length in expected_columns.items():
- # # if pd.isna(row[col]):
- # # return f"第{index + 1}行的'{col}'字段不能为空。"
- # if pd.notna(row[col]) and len(str(row[col])) > max_length:
- # raise AppException(500, f"第{index + 1}行的'{col}'字段长度超过{max_length}字符。")
- #
- # data = df.to_dict(orient='records')
- # # print(data)
- # infos = []
- # for info in data:
- # if pd.isna(info['单位名称']) and pd.isna(info['联系人']) and pd.isna(info['粤政易手机号码']):
- # continue
- # if pd.isna(info['单位名称']) or pd.isna(info['联系人']) or pd.isna(info['粤政易手机号码']):
- # return "单位名称、联系人、粤政易手机号码为必填"
- # if pd.isna(info['职务']):
- # info['职务'] = None
- # infos.append(info)
- #
- # # 创建新的预案记录
- # for contact in infos:
- # unit_id = db_dept.get_dept_id_by_name(db, contact['单位名称'])
- # if unit_id == '':
- # raise AppException(500, "单位名称不正确")
- #
- # # 删除之前同一个部门的人员
- # db.query(EmergencyContactInfo).filter(
- # and_(EmergencyContactInfo.del_flag == "0", EmergencyContactInfo.unit_id == unit_id, )).update(
- # {"del_flag": "2"})
- #
- # new_contact = EmergencyContactInfo(
- # unit_id=unit_id,
- # unit_name=contact['单位名称'],
- # contact_name=contact['联系人'],
- # position=contact['职务'],
- # yue_gov_ease_phone=contact['粤政易手机号码'],
- # create_by=user_id
- # )
- #
- # # 添加到数据库会话
- # db.add(new_contact)
- # # 提交
- # db.commit()
- #
- # # 返回创建成功的响应
- # return {
- # "code": 200,
- # "msg": "创建成功",
- # "data": None
- # }
- #
- # except AppException as e:
- # return {
- # "code": 500,
- # "msg": e.msg
- # }
- #
- # except Exception as e:
- # traceback.print_exc()
- # # 处理异常
- # raise HTTPException(status_code=500, detail=str(e))
|