123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971 |
- from fastapi import APIRouter, Request, Depends, HTTPException, Query, BackgroundTasks
- from sqlalchemy.exc import IntegrityError
- from fastapi.responses import HTMLResponse, FileResponse
- from fastapi.responses import JSONResponse
- from database import get_db
- from sqlalchemy import text, exists, and_, or_, not_
- from sqlalchemy.orm import Session
- from models import *
- import json
- import random
- import os
- from sqlalchemy import create_engine, select
- from typing import Optional
- from utils.StripTagsHTMLParser import *
- from utils.three_proofing_responsible_util import *
- from utils.ry_system_util import *
- from common.db import db_event_management, db_user, db_area, db_emergency_plan
- from common.security import valid_access_token
- import traceback
- from utils import *
- from datetime import datetime, timedelta
- import pandas as pd
- import xlrd
- from common.db import db_dept
- from exceptions import AppException, HmacException
- from common.enc import mpfun, three_proofing_responsible_person_data
- from common.db import db_czrz
- from common.auth_user import *
- router = APIRouter()
- @router.post('/create')
- async def create_contact(
- request: Request,
- auth_user: AuthUser = Depends(find_auth_user),
- db: Session = Depends(get_db),
- body=Depends(remove_xss_json),
- user_id=Depends(valid_access_token)
- ):
- try:
- # 提取请求数据
- unit_name = body['unit_name']
- name = body['name']
- area_code = body['area_code']
- area_info = id_get_area_info(db, body['area_code'])
- if area_info:
- area_code2 = area_info.area_code
- position = body['position']
- phone = body['phone']
- telephone= ''
- if 'telephone' in body:
- telephone=body['telephone']
- order_num = -1
- if 'order_num'in body:
- order_num=body['order_num']
- unit_id = db_dept.get_dept_id_by_name(db, unit_name)
- user_id_1 = db_user.get_user_id_by_phonenumber(db,phone)
- # 创建新的记录
- new_person = ThreeProofingResponsiblePerson(
- unit_id=unit_id,
- unit_name=unit_name,
- name=name,
- area_code2=area_code2,
- area_code=area_code,
- position=position,
- phone=phone,
- telephone=telephone,
- user_id = user_id_1,
- order_num=order_num,
- create_by=user_id
- )
- # 添加到数据库会话并提交
- type_list = body['type_list']
- if isinstance(type_list,list) and len(type_list)>0:
- db.add(new_person)
- db.commit()
- three_proofing_responsible_person_data.sign_table()
- else:
- return JSONResponse(status_code=404,content={
- 'code': 404,
- 'msg': '责任类型不能为空'
- })
- try:
- for type_info in type_list:
- type_parent_id = type_info['type_parent_id']#get_type_parent_id_by_type_id(db,type_id)
- for type_id in type_info['children']:
- new_person_type = ThreeProofingResponsiblePersonType(
- type_parent_id = type_parent_id,
- type_id = type_id,
- person_id = new_person.id,
- create_by=user_id
- )
- db.add(new_person_type)
- if type_parent_id in ('5','7','9'):
- if 'children2' in type_info:
- for other_type_id in type_info['children2']:
- new_person_other_type = ThreeProofingResponsiblePersonOtherType(
- type_parent_id=type_parent_id,
- other_type_id=other_type_id,
- person_id=new_person.id,
- create_by=user_id
- )
- db.add(new_person_other_type)
- if type_parent_id in ('4','5','7','10','11'):
- dept_name = None
- if 'dept_name' in type_info:
- dept_name = type_info['dept_name']
- other_type_2_name = None
- if 'other_type_2_name' in type_info:
- other_type_2_name = type_info['other_type_2_name']
- denger_point_name = None
- if 'denger_point_name' in type_info:
- denger_point_name = type_info['denger_point_name']
- new_person_other_info = ThreeProofingResponsiblePersonOtherInfo(
- type_parent_id = type_parent_id,
- dept_name = dept_name,
- other_type_2_name = other_type_2_name,
- denger_point_name = denger_point_name,
- person_id = new_person.id,
- create_by=user_id
- )
- db.add(new_person_other_info)
- except:
- db.rollback()
- traceback.print_exc()
- new_person.del_flag='2'
- db.commit()
- db_czrz.log(db, auth_user, "系统管理", f"后台管理新建三防责任人管理人员信息成功", request.client.host)
- # 返回创建成功的响应
- return {
- "code": 200,
- "msg": "创建成功",
- "data": None
- }
- except Exception as e:
- # 处理异常
- db.rollback()
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=str(e))
- @router.put('/update')
- async def update_contact(
- request: Request,
- auth_user: AuthUser = Depends(find_auth_user),
- db: Session = Depends(get_db),
- body=Depends(remove_xss_json),
- user_id=Depends(valid_access_token)
- ):
- try:
- # 提取请求数据
- id = body['id']
- person = get_person_info_by_id(db,id)
- if not person:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': '责任人不存在'
- })
- person.unit_name = body['unit_name']
- person.name = body['name']
- person.area_code = body['area_code']
- area_info = id_get_area_info(db, body['area_code'])
-
- if area_info:
- person.area_code2 = area_info.area_code
- person.position = body['position']
- person.phone = mpfun.enc_data(body['phone'])
-
- if 'telephone' in body:
- person.telephone = mpfun.enc_data(body['telephone'])
- person.order_num = body['order_num']
-
- if body['order_num']=='':
- person.order_num = -1
- person.unit_id = db_dept.get_dept_id_by_name(db, body['unit_name'])
- person.update_by = user_id
- person.update_time = datetime.now()
- person.sign = three_proofing_responsible_person_data.get_sign_hmac(person)
-
- type_list = body['type_list']
- old_person_type_list=get_person_type_by_person_id(db,person.id)
-
- for old_person_type in old_person_type_list:
- old_person_type.del_flag='2'
- old_person_other_info_list = get_person_other_info_by_person_id(db,person.id)
-
- for old_person_other_info in old_person_other_info_list:
- old_person_other_info.del_flag = '2'
- old_person_other_type_list = get_person_other_type_by_person_id(db,person.id)
-
- for old_person_other_type in old_person_other_type_list:
- old_person_other_type.del_flag = '2'
- for type_info in type_list:
- type_parent_id = type_info['type_parent_id'] # get_type_parent_id_by_type_id(db,type_id)
- for type_id in type_info['children']:
- new_person_type = ThreeProofingResponsiblePersonType(
- type_parent_id=type_parent_id,
- type_id=type_id,
- person_id=id,
- create_by=user_id
- )
- db.add(new_person_type)
- if type_parent_id in ('5', '7', '9'):
- if 'children2' in type_info:
- for other_type_id in type_info['children2']:
- new_person_other_type = ThreeProofingResponsiblePersonOtherType(
- type_parent_id=type_parent_id,
- other_type_id=other_type_id,
- person_id=person.id,
- create_by=user_id
- )
- db.add(new_person_other_type)
- if type_parent_id in ('4', '5', '7', '10', '11'):
- dept_name = None
- if 'dept_name' in type_info:
- dept_name = type_info['dept_name']
- other_type_2_name = None
- if 'other_type_2_name' in type_info:
- other_type_2_name = type_info['other_type_2_name']
- denger_point_name = None
- if 'denger_point_name' in type_info:
- denger_point_name = type_info['denger_point_name']
- new_person_other_info = ThreeProofingResponsiblePersonOtherInfo(
- type_parent_id=type_parent_id,
- dept_name=dept_name,
- other_type_2_name=other_type_2_name,
- denger_point_name=denger_point_name,
- person_id=person.id,
- create_by=user_id
- )
- db.add(new_person_other_info)
- # 更新到数据库会话并提交
- db.commit()
- db_czrz.log(db, auth_user, "系统管理", f"后台管理更新三防责任人管理人员信息成功", request.client.host)
- # 返回创建成功的响应
- return {
- "code": 200,
- "msg": "更新成功",
- "data": None
- }
- except Exception as e:
- # 处理异常
- db.rollback()
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=str(e))
- @router.get('/list')
- async def get_emergency_contact_list(
- type_parent_id: str = Query(None, description='单位名称'),
- area_code:str = Query(None, description='单位名称'),
- Name: str = Query(None, description='联系人'),
- page: int = Query(1, gt=0, description='页码'),
- pageSize: int = Query(10, gt=0, description='每页条目数量'),
- db: Session = Depends(get_db),
- user_id=Depends(valid_access_token)
- ):
- try:
- # 构建查询
- query = db.query(ThreeProofingResponsiblePerson)
- query = query.filter(ThreeProofingResponsiblePerson.del_flag == '0')
- # 应用查询条件
- if type_parent_id:
- person_list = get_person_list_by_type_parent_id(db,type_parent_id)
- query = query.filter(ThreeProofingResponsiblePerson.id.in_(person_list))
- if Name:
- query = query.filter(ThreeProofingResponsiblePerson.name.like(f'%{Name}%'))
- def get_area_chli(area_list : list,parent_id : int):
- areas = parent_id_get_area_info(db,parent_id)
- if areas:
- for area in areas:
- area_list.append(area.id)
- get_area_chli(area_list, area.id)
- return area_list
- # if area_code:
- # query = query.filter(ThreeProofingResponsiblePerson.area_code.in_(get_area_chli([area_code],area_code)))
- # '''440900000000'''
- if area_code:
- area_info = id_get_area_info(db,area_code)
- if area_info:
- area_code = area_info.area_code
- area_code=area_code.replace('0000000000','').replace('00000000','').replace('000000','').replace('000','')
- query = query.filter(ThreeProofingResponsiblePerson.area_code2.like(f'%{area_code}%') )
- # if area_code:
- # query = query.filter(ThreeProofingResponsiblePerson.area_code==area_code)
- # 计算总条目数
- total_items = query.count()
- # 排序
- query = query.order_by(ThreeProofingResponsiblePerson.order_num.asc())
- query = query.order_by(ThreeProofingResponsiblePerson.create_time.desc())
- # 执行分页查询
- contact_infos = query.offset((page - 1) * pageSize).limit(pageSize).all()
- for info in contact_infos:
- if three_proofing_responsible_person_data.sign_valid_row(info) == False:
- raise HmacException(500, "三防责任人管理人员信息表验证异常,已被非法篡改")
- # 将查询结果转换为列表形式的字典
- contact_infos_list = []
- for info in contact_infos:
- type_parent_id_list = get_type_parent_id_by_person_id(db,info.id)
- type_parent_list = []
- type_parent_list2 = []
- for type_parent in type_parent_id_list:
- if type_parent not in type_parent_list2:
- dict_data = get_dict_data_info(db,'three_proofing',type_parent)
- type_parent_list2.append(type_parent)
- type_parent_list.append({"type_parent_id":type_parent,"type_parent":dict_data.dict_label})
- area_info = id_get_area_info(db,info.area_code)
- user_info = user_id_get_user_info(db,info.create_by)
- area_list = db_area.id_get_area_parent_list(db,info.area_code,[])
- if info.order_num ==-1:
- order_num = ''
- else:
- order_num = str(info.order_num)
- contact_infos_list.append({
- "id": info.id,
- "unit_id": info.unit_id,
- "unit_name": info.unit_name,
- "name": info.name,
- "area_list":area_list,
- "area_code": info.area_code,
- "area_name": area_info.area_name,
- "position": info.position,
- "phone": mpfun.dec_data(info.phone),
- "telephone": mpfun.dec_data(info.telephone),
- "order_num": order_num,
- "online_status":'0',
- "create_time": info.create_time.strftime('%Y-%m-%d %H:%M:%S'),
- "create_by":info.create_by,
- "create_user":user_info.nick_name,
- "type_parent_list":type_parent_list
- })
- # 返回结果+
- return {
- "code": 200,
- "msg": "成功",
- "data": contact_infos_list,
- "total": total_items
- }
-
- except HmacException as e:
- return {
- "code": e.code,
- "msg": e.msg
- }
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=str(e))
- @router.get('/info/{id}')
- async def get_emergency_contact_id_info(
- id: str,
- db: Session = Depends(get_db),
- user_id=Depends(valid_access_token)
- ):
- try:
- contact = get_person_info_by_id(db,id)
- if not contact:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': '联系人不存在'
- })
- # 将查询结果转换为列表形式的字典
- area_info = id_get_area_info(db,contact.area_code)
- user_info = user_id_get_user_info(db,contact.create_by)
- area_list = db_area.id_get_area_parent_list(db, contact.area_code, [])
- if contact.order_num == -1:
- order_num = ''
- else:
- order_num = str(contact.order_num)
- contact_result = {
- "id": contact.id,
- "unit_id": contact.unit_id,
- "unit_name": contact.unit_name,
- "name": contact.name,
- "area_list":area_list,
- "area_code":contact.area_code,
- "area_name": area_info.area_name,
- "position": contact.position,
- "phone": mpfun.dec_data(contact.phone),
- "telephone":mpfun.dec_data(contact.telephone),
- "order_num":order_num,
- "online_status":'0',
- "create_time": contact.create_time.strftime('%Y-%m-%d %H:%M:%S'),
- "create_by":contact.create_by,
- "create_user":user_info.nick_name,
- "type_list":[]
- }
- type_parent_id_list = []
- type_list = get_person_type_by_person_id(db,contact.id)
- for type_info in type_list:
- if type_info.type_parent_id not in type_parent_id_list:
- type_parent_id_list.append(type_info.type_parent_id)
- for type_parent_id in type_parent_id_list:
- dict_data = get_dict_data_info(db, 'three_proofing', type_parent_id)
- type_data = {"type_parent_id": type_parent_id, "type_parent": dict_data.dict_label,"children":[],"labelData":[]}
- for type_info in get_person_type_by_person_id_and_type_parent_id(db,contact.id,type_parent_id):
- type_data_info = get_type_info_by_id(db,type_info.type_id)
- # type_data['children'].append({"id": type_info.type_id, "label": type_data_info.type_name})
- type_data['children'].append(type_info.type_id)
- type_data['labelData'].append( type_data_info.type_name)
- other_info = get_person_other_info_by_person_id_and_type_parent_id(db, contact.id, type_parent_id)
- if other_info:
- type_data['dept_name'] = other_info.dept_name
- type_data['denger_point_name'] = other_info.denger_point_name
- type_data['other_type_2_name'] = other_info.other_type_2_name
- other_type_list = get_person_other_type_by_person_id_and_type_parent_id(db, contact.id, type_parent_id)
- if other_type_list:
- type_data['children2'] = []
- type_data['other_type_label'] = []
- for other_type in other_type_list:
- other_type_info = get_other_type_info_by_id(db, other_type.other_type_id)
- label= ''
- if other_type_info:
- label = other_type_info.type_name
- # type_data['children2'].append({"id": other_type.other_type_id, "label": label})
- type_data['children2'].append(other_type.other_type_id)
- type_data['other_type_label'].append(label)
- contact_result['type_list'].append(type_data)
- # 返回结果
- return {
- "code": 200,
- "msg": "成功",
- "data": contact_result
- }
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=str(e))
- @router.delete('/delete')
- async def delete_emergency_plans(
- request: Request,
- ids: list,
- db: Session = Depends(get_db),
- body=Depends(remove_xss_json),
- auth_user: AuthUser = Depends(find_auth_user),
- user_id=Depends(valid_access_token)
- ):
- try:
- # 提取请求数据
- query = db.query(ThreeProofingResponsiblePerson)
- query = query.filter(ThreeProofingResponsiblePerson.del_flag != '2')
- query = query.filter(ThreeProofingResponsiblePerson.id.in_(ids))
- contacts = query.all()
- if not contacts:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': '联系人不存在'
- })
- for contact in contacts:
- contact.del_flag = '2'
- contact.update_by = user_id
- contact.update_time = datetime.now()
- contact.sign = three_proofing_responsible_person_data.get_sign_hmac(contact)
- # 更新到数据库会话并提交
- db.commit()
- db_czrz.log(db, auth_user, "系统管理", f"后台管理删除三防责任人管理人员信息成功", request.client.host)
- # 返回创建成功的响应
- return {
- "code": 200,
- "msg": "删除成功",
- "data": None
- }
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=str(e))
- @router.delete('/delete/{id}')
- async def delete_emergency_plans(
- request: Request,
- id: int,
- db: Session = Depends(get_db),
- body=Depends(remove_xss_json),
- auth_user: AuthUser = Depends(find_auth_user),
- user_id=Depends(valid_access_token)
- ):
- try:
- contact = get_person_info_by_id(db, id)
- if not contact:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': '联系人不存在'
- })
- contact.del_flag = '2'
- contact.update_by = user_id
- contact.update_time = datetime.now()
- contact.sign = three_proofing_responsible_person_data.get_sign_hmac(contact)
- # 更新到数据库会话并提交
- db.commit()
- db_czrz.log(db, auth_user, "系统管理", f"后台管理删除三防责任人管理人员信息成功", request.client.host)
- # 返回创建成功的响应
- return {
- "code": 200,
- "msg": "删除成功",
- "data": None
- }
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=str(e))
- def string_type_parent_id_create_data(db,string,type_parent_id,file_info,new_person,user_id,row) :
- type_name_list = [i for i in string.split(',')]
- reslte = []
- for type_name in type_name_list:
- type_id = get_type_id_by_type_parent_id_and_type_name(db, type_parent_id, type_name)
- if type_id:
- new_person_type = ThreeProofingResponsiblePersonType(
- type_parent_id=type_parent_id,
- type_id=type_id,
- person_id=new_person.id,
- create_by=user_id
- )
- reslte.append(new_person_type)
- else:
- file_info.remark= file_info.remark+f'\n行<{row+1}>责任类别未找到<{type_name}>'
- return reslte ,False
- return reslte ,True
- def other_type_string_type_parent_id_create_data(db,string,type_parent_id,file_info,new_person,user_id,row) :
- type_name_list = [i for i in string.split(',')]
- reslte = []
- for other_type_name in type_name_list:
- other_type_id = get_other_type_id_by_type_parent_id_and_other_type_name(db, type_parent_id, other_type_name)
- if other_type_id:
- new_person_type = ThreeProofingResponsiblePersonOtherType(
- type_parent_id=type_parent_id,
- other_type_id=other_type_id,
- person_id=new_person.id,
- create_by=user_id
- )
- reslte.append(new_person_type)
- else:
- file_info.remark= file_info.remark+f'\n行<{row+1}>责任类别未找到<{other_type_name}>'
- file_info.error_num +=1
- return reslte ,False
- return reslte ,True
- def import_data(db,file_path,user_id,file_info):
- import_status = True
- try:
- book = xlrd.open_workbook(file_path)
- sheet = book.sheet_by_index(0)
- except :
- file_info.remark = file_info.remark + f'\n文件打开失败,请核实文件格式为xlsx/xlx>'
- file_info.error_num +=1
- import_status = False
- data = []
- for row in range(4, sheet.nrows):
- # 姓名
- name = sheet.cell(row, 0).value
- if name == '':
- file_info.remark = file_info.remark+f'\n行<{row+1}>姓名不能为空<{name}>'
- file_info.error_num +=1
- import_status = False
- # 所属单位
- unit_name = sheet.cell(row, 1).value
- if unit_name == '':
- file_info.remark = file_info.remark+f'\n行<{row+1}>所属单位不能为空<{unit_name}>'
- file_info.error_num +=1
- import_status = False
- unit_id = db_dept.get_dept_id_by_name(db, unit_name)
- # 职务
- position = sheet.cell(row, 2).value
- if position =='':
- file_info.remark = file_info.remark+f'\n行<{row+1}>职务不能为空<{position}>'
- file_info.error_num +=1
- import_status = False
- # 电话号码(如有多个手机号请用“,”分隔)
- phone = str(sheet.cell(row, 3).value)
- if phone =='':
- file_info.remark = file_info.remark+f'\n行<{row+1}>电话号码不能为空<{phone}>'
- file_info.error_num +=1
- import_status = False
- phone_list = [i for i in phone.split(',')]
- user_id_1=-1
- for i in phone_list:
- user_id_1 = db_user.get_user_id_by_phonenumber(db, i)
- if user_id_1 != -1:
- break
- # 办公电话
- # (选填,格式:区号-电话号码)
- telephone = sheet.cell(row, 4).value
- # 排位顺序
- # (选填,请输入排序号1-9999,排序号数值越小越靠前,不填则默认排至最末)
- order_num = sheet.cell(row, 5).value
- if order_num == '':
- order_num=-1
- area_name = sheet.cell(row, 7).value
- if area_name=='':
- area_code=2
- area_code2='440900000000'
- else:
- area_code=get_area_info_by_area_name(db,area_name)
- if area_code is None:
- file_info.remark = file_info.remark+f'\n行<{row+1}>责任区域未找到'
- file_info.error_num +=1
- import_status = False
- area_code2 = area_code.area_code
- area_code = area_code.id
-
- new_person = ThreeProofingResponsiblePerson(
- unit_id=unit_id,
- unit_name=unit_name,
- name=name,
- area_code2=area_code2,
- area_code=area_code,
- position=position,
- phone=phone.replace,
- telephone=telephone,
- user_id=user_id_1,
- order_num=order_num,
- create_by=user_id,
- sign = ''
- )
- data.append(new_person)
- db.add(new_person)
- db.commit()
- # 党委政府
- a1 = sheet.cell(row, 6).value
- if a1 != '':
- new_type_list,status = string_type_parent_id_create_data(db,a1,'1',file_info,new_person,user_id,row)
- if status:
- db.add_all(new_type_list)
- data +=new_type_list
- else:
- import_status = status
- # type_name_list = [i for i in a1.split(',')]
- # for type_name in type_name_list:
- # type_id = get_type_id_by_type_parent_id_and_type_name(db, '1', type_name)
- # if type_id:
- # pass
- # 三防指挥部
- b1 = sheet.cell(row, 8).value
- if b1 != '':
- new_type_list,status = string_type_parent_id_create_data(db,b1,'2',file_info,new_person,user_id,row)
- if status:
- db.add_all(new_type_list)
- data +=new_type_list
- else:
- import_status = status
- b2 = sheet.cell(row, 9).value
- # 应急部门
- c1 = sheet.cell(row, 10).value
- if c1!='':
- new_type_list, status = string_type_parent_id_create_data(db, c1, '3', file_info, new_person, user_id,row)
- if status:
- db.add_all(new_type_list)
- data += new_type_list
- else:
- import_status = status
- # 成员单位
- d1 = sheet.cell(row, 11).value
- if d1!='':
- new_type_list, status = string_type_parent_id_create_data(db, d1, '4', file_info, new_person, user_id,row)
- if status:
- db.add_all(new_type_list)
- data += new_type_list
- else:
- import_status = status
- d2 = sheet.cell(row, 12).value
- if d2!='':
- dept_name = d2
- new_person_other_info = ThreeProofingResponsiblePersonOtherInfo(
- type_parent_id='4',
- dept_name=dept_name,
- person_id=new_person.id,
- create_by=user_id
- )
- db.add(new_person_other_info)
- data.append(new_person_other_info)
- # 重点部门
- e1 = sheet.cell(row, 13).value
- if e1!='':
- new_type_list, status = string_type_parent_id_create_data(db, e1, '5', file_info, new_person, user_id,row)
- if status:
- db.add_all(new_type_list)
- data += new_type_list
- else:
- import_status = status
- e2 = sheet.cell(row, 14).value
- if e2!='':
- dept_name = e2
- new_person_other_info = ThreeProofingResponsiblePersonOtherInfo(
- type_parent_id='5',
- dept_name=dept_name,
- person_id=new_person.id,
- create_by=user_id
- )
- db.add(new_person_other_info)
- data.append(new_person_other_info)
- e3 = sheet.cell(row, 15).value
- if e3!='':
- new_type_list,status = other_type_string_type_parent_id_create_data(db,e3,'5',file_info,new_person,user_id,row)
- if status:
- db.add_all(new_type_list)
- data +=new_type_list
- else:
- import_status = status
- # 行政村
- f1 = sheet.cell(row, 16).value
- if f1!='':
- new_type_list, status = string_type_parent_id_create_data(db, f1, '6', file_info, new_person, user_id,row)
- if status:
- db.add_all(new_type_list)
- data += new_type_list
- else:
- import_status = status
- # 水利工程
- g1 = sheet.cell(row, 17).value
- if g1!='':
- new_type_list, status = string_type_parent_id_create_data(db, g1, '7', file_info, new_person, user_id,row)
- if status:
- db.add_all(new_type_list)
- data += new_type_list
- else:
- import_status = status
- g2 = sheet.cell(row, 18).value
- if g2!='':
- dept_name = g2
- new_person_other_info = ThreeProofingResponsiblePersonOtherInfo(
- type_parent_id='7',
- dept_name=dept_name,
- person_id=new_person.id,
- create_by=user_id
- )
- db.add(new_person_other_info)
- data.append(new_person_other_info)
- g3 = sheet.cell(row, 19).value
- if g3!='':
- new_type_list, status = other_type_string_type_parent_id_create_data(db, g3, '11', file_info, new_person, user_id,
- row)
- if status:
- db.add_all(new_type_list)
- data += new_type_list
- else:
- import_status = status
- # 受威胁转移
- h1 = sheet.cell(row, 20).value
- if h1!='':
- new_type_list, status = string_type_parent_id_create_data(db, h1, '8', file_info, new_person, user_id,row)
- if status:
- db.add_all(new_type_list)
- data += new_type_list
- else:
- import_status = status
- # 抢险队伍
- j1 = sheet.cell(row, 21).value
- if j1!='':
- new_type_list, status = string_type_parent_id_create_data(db, j1, '9', file_info, new_person, user_id,row)
- if status:
- db.add_all(new_type_list)
- data += new_type_list
- else:
- import_status = status
- j2 = sheet.cell(row, 22).value
- if j2!='':
- new_type_list, status = other_type_string_type_parent_id_create_data(db, j2, '9', file_info, new_person, user_id,
- row)
- if status:
- db.add_all(new_type_list)
- data += new_type_list
- else:
- import_status = status
- # 地质灾害
- k1 = sheet.cell(row, 23).value
- if k1!='':
- new_type_list, status = string_type_parent_id_create_data(db, k1, '10', file_info, new_person, user_id,row)
- if status:
- db.add_all(new_type_list)
- data += new_type_list
- else:
- import_status = status
- k2 = sheet.cell(row, 24).value
- if k2!='':
- denger_point_name = k2
- new_person_other_info = ThreeProofingResponsiblePersonOtherInfo(
- type_parent_id='10',
- denger_point_name=denger_point_name,
- person_id=new_person.id,
- create_by=user_id
- )
- db.add(new_person_other_info)
- data.append(new_person_other_info)
- # 其他
- l1 = sheet.cell(row, 25).value
- if l1!='':
- new_type_list, status = string_type_parent_id_create_data(db, l1, '11', file_info, new_person, user_id,row)
- if status:
- db.add_all(new_type_list)
- data += new_type_list
- else:
- import_status = status
- l2 = sheet.cell(row, 26).value
- if l2!='':
- other_type_2_name = l2
- new_person_other_info = ThreeProofingResponsiblePersonOtherInfo(
- type_parent_id='11',
- other_type_2_name=other_type_2_name,
- person_id=new_person.id,
- create_by=user_id
- )
- db.add(new_person_other_info)
- data.append(new_person_other_info)
- db.commit()
- if import_status == False:
- # print(1111111)
- for info in data:
- db.delete(info)
- file_info.status = 2
- db.commit()
- three_proofing_responsible_person_data.sign_table()
- @router.post('/createImport')
- async def create_contact(
- request: Request,
- background_tasks: BackgroundTasks,
- db: Session = Depends(get_db),
- body=Depends(remove_xss_json),
- auth_user: AuthUser = Depends(find_auth_user),
- user_id=Depends(valid_access_token)
- ):
- try:
- # 提取请求数据
- filename = body['filename']
- file_name_desc = body['file_name_desc']
- if len(filename) == 0:
- raise Exception()
- file_path = f'/data/upload/mergefile/uploads/{filename}'
- # 检查文件是否存在
- if not os.path.isfile(file_path):
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': f'{filename}不存在'
- })
- new_file = ThreeProofingResponsiblePersonImportFileStatus(
- file_uuid=filename,
- file_name = file_name_desc,
- status = '1',
- remark = '',
- user_id=user_id
- )
- db.add(new_file)
- db.commit()
- background_tasks.add_task(import_data,db,file_path, user_id,new_file)
-
- db_czrz.log(db, auth_user, "系统管理", f"后台管理导入三防责任人管理人员信息成功", request.client.host)
- # 返回创建成功的响应
- return {
- "code": 200,
- "msg": "成功",
- "data": None
- }
- except AppException as e:
- return {
- "code": 500,
- "msg": e.msg
- }
- except Exception as e:
- traceback.print_exc()
- # 处理异常
- db.rollback()
- raise HTTPException(status_code=500, detail=str(e))
- @router.get("/export")
- async def download_file(filename: str,filenameDesc: str = None):
- """
- 根据提供的文件名下载文件。
- :param filename: 要下载的文件的名称。
- """
- try:
- # 构造文件的完整路径
- file_path = os.path.join('', 'uploads/', filename)
- # 检查文件是否存在
- if not os.path.isfile(file_path):
- raise HTTPException(status_code=404, detail="文件未找到")
- if not filenameDesc:
- filenameDesc = filename
- # 设置文件头部和内容类型
- headers = {
- 'Content-Disposition': f'attachment; filename={filenameDesc}'
- }
- # 使用FileResponse返回文件流
- return FileResponse(
- path=file_path,
- headers=headers,
- media_type='application/octet-stream' # 可以按需更改为适当的MIME类型
- )
- except HTTPException as e:
- raise e
- except Exception as e:
- # 处理其他异常情况
- raise HTTPException(status_code=500, detail=str(e))
|