from fastapi import APIRouter, Request, Depends, HTTPException, Query from sqlalchemy.exc import IntegrityError from fastapi.responses import HTMLResponse, FileResponse from fastapi.responses import JSONResponse from database import get_db from sqlalchemy import text, exists, and_, or_, not_ from sqlalchemy.orm import Session from models import * import json import random import os from sqlalchemy import create_engine, select from typing import Optional from utils.StripTagsHTMLParser import * from common.db import db_event_management, db_user, db_area, db_emergency_plan from common.security import valid_access_token import traceback from utils import * from datetime import datetime, timedelta import pandas as pd from common.db import db_dept from exceptions import AppException router = APIRouter() @router.get('/list') async def get_emergency_contact_list( unitId: str = Query(None, description='单位名称'), contactName: str = Query(None, description='联系人'), page: int = Query(1, gt=0, description='页码'), pageSize: int = Query(10, gt=0, description='每页条目数量'), db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): try: # 构建查询 query = db.query(EmergencyContactInfo) query = query.filter(EmergencyContactInfo.del_flag == '0') # 应用查询条件 if unitId: query = query.filter(EmergencyContactInfo.unit_id == unitId) if contactName: query = query.filter(EmergencyContactInfo.contact_name.like(f'%{contactName}%')) # 计算总条目数 total_items = query.count() # 排序 query = query.order_by(EmergencyContactInfo.id.asc()) # 执行分页查询 contact_infos = query.offset((page - 1) * pageSize).limit(pageSize).all() # 将查询结果转换为列表形式的字典 contact_infos_list = [ { "id": info.id, "unitId": info.unit_id, "unitName": info.unit_name, "contactName": info.contact_name, "position": info.position, "phone": info.yue_gov_ease_phone, "create_time": info.create_time.strftime('%Y-%m-%d') } for info in contact_infos ] # 返回结果 return { "code": 200, "msg": "成功", "data": contact_infos_list, "total": total_items } except Exception as e: # 处理异常 raise HTTPException(status_code=500, detail=str(e)) @router.get('/info/{id}') async def get_emergency_contact_id_info( id: str , db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): try: # 构建查询 query = db.query(EmergencyContactInfo) query = query.filter(EmergencyContactInfo.del_flag != '2') # 应用查询条件 if id: query = query.filter(EmergencyContactInfo.id == id) else: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': '联系人不存在' }) # 执行查询 contact = query.first() if not contact: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': '联系人不存在' }) # 将查询结果转换为列表形式的字典 contact_result = { "id": contact.id, "unitId": contact.unit_id, "unitName": contact.unit_name, "contactName": contact.contact_name, "position": contact.position, "phone": contact.yue_gov_ease_phone, "create_time": contact.create_time.strftime('%Y-%m-%d') } # 返回结果 return { "code": 200, "msg": "成功", "data": contact_result } except Exception as e: # 处理异常 raise HTTPException(status_code=500, detail=str(e)) @router.post('/create') async def create_contact( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 提取请求数据 unit_id = body['unitId'] contact_name = body['contactName'] position = body['position'] yue_gov_ease_phone = body['phone'] unit_name = db_dept.get_dept_name_by_id(db, unit_id) if unit_name == '': raise Exception('单位不正确') # 创建新的预案记录 new_contact = EmergencyContactInfo( unit_id = unit_id, unit_name = unit_name, contact_name = contact_name, position = position, yue_gov_ease_phone = yue_gov_ease_phone, create_by = user_id ) # 添加到数据库会话并提交 db.add(new_contact) db.commit() # 返回创建成功的响应 return { "code": 200, "msg": "创建成功", "data": None } except Exception as e: # 处理异常 raise HTTPException(status_code=500, detail=str(e)) @router.put('/update') async def update_contact( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 提取请求数据 query = db.query(EmergencyContactInfo) query = query.filter(EmergencyContactInfo.del_flag != '2') id = body['id'] query = query.filter(EmergencyContactInfo.id == id) contact = query.first() if not contact: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': '联系人不存在' }) if "unitId" in body: unit_name = db_dept.get_dept_name_by_id(db, body['unitId']) if unit_name == '': raise Exception('单位不正确') contact.unit_id = body['unitId'] contact.unit_name = unit_name if "contactName" in body: contact.contact_name = body['contactName'] if "position" in body: contact.position = body['position'] if "phone" in body: contact.yue_gov_ease_phone = body['phone'] # 更新到数据库会话并提交 db.commit() # 返回创建成功的响应 return { "code": 200, "msg": "更新成功", "data": None } except Exception as e: # 处理异常 raise HTTPException(status_code=500, detail=str(e)) @router.delete('/delete') async def delete_emergency_plans( ids: list, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 提取请求数据 query = db.query(EmergencyContactInfo) query = query.filter(EmergencyContactInfo.del_flag != '2') query = query.filter(EmergencyContactInfo.id.in_(ids)) contacts = query.all() if not contacts: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': '联系人不存在' }) for contact in contacts: contact.del_flag = '2' contact.create_by=user_id # 更新到数据库会话并提交 db.commit() # 返回创建成功的响应 return { "code": 200, "msg": "删除成功", "data": None } except Exception as e: # 处理异常 raise HTTPException(status_code=500, detail=str(e)) @router.delete('/delete/{id}') async def delete_emergency_plans( id: int, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 提取请求数据 query = db.query(EmergencyContactInfo) query = query.filter(EmergencyContactInfo.del_flag != '2') query = query.filter(EmergencyContactInfo.id==id) contact = query.first() if not contact: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': '联系人不存在' }) contact.del_flag = '2' contact.create_by=user_id # 更新到数据库会话并提交 db.commit() # 返回创建成功的响应 return { "code": 200, "msg": "删除成功", "data": None } except Exception as e: # 处理异常 raise HTTPException(status_code=500, detail=str(e)) @router.post('/createImport') async def create_contact( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 提取请求数据 filename = body['filename'] if len(filename) == 0: raise Exception() file_name = filename[0]['url'] file_path = f'/data/upload/mergefile/uploads/{file_name}' # 检查文件是否存在 if not os.path.isfile(file_path): return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': f'{file_name}不存在' }) # 定义预期的列名和对应的最大长度 expected_columns = { '单位名称': 255, '联系人': 255, '职务': 255, '粤政易手机号码': 20 } try: df = pd.read_excel(file_path, engine='openpyxl', header=0) except Exception as e: raise AppException(500, "请按模板上传!") # 读取Excel文件 # try: # df = pd.read_excel(file_path, engine='openpyxl', header=0) # except Exception as e: # return f"读取Excel文件时出错: {e}" # 获取前两行的数据 first_two_rows = df # print(first_two_rows) # 检查列名是否匹配 actual_columns = first_two_rows.columns.tolist() # print('actual_columns', actual_columns) missing_columns = [col for col in expected_columns.keys() if col not in actual_columns] if len(missing_columns) > 0: raise AppException(500, "请按模板上传") head1 = df.head(1) head1data = head1.to_dict(orient='records')[0] print(head1data) if head1data['单位名称'] == '填写单位名称' and head1data['联系人'] == '填写单位联系人' and head1data['职务'] == '填写联系人职务' and \ head1data['粤政易手机号码'] == '填写联系人的粤政易注册手机号码': df = df.drop(index=0) else: raise AppException(500, "请按模板上传。") # 检查前两行的字段长度 for index, row in first_two_rows.iterrows(): for col, max_length in expected_columns.items(): # if pd.isna(row[col]): # return f"第{index + 1}行的'{col}'字段不能为空。" if pd.notna(row[col]) and len(str(row[col])) > max_length: raise AppException(500, f"第{index + 1}行的'{col}'字段长度超过{max_length}字符。") data = df.to_dict(orient='records') # print(data) infos = [] for info in data: if pd.isna(info['单位名称']) and pd.isna(info['联系人']) and pd.isna(info['粤政易手机号码']): continue if pd.isna(info['单位名称']) or pd.isna(info['联系人']) or pd.isna(info['粤政易手机号码']): return "单位名称、联系人、粤政易手机号码为必填" if pd.isna(info['职务']): info['职务'] = None infos.append(info) # 创建新的预案记录 for contact in infos: unit_id = db_dept.get_dept_id_by_name(db, contact['单位名称']) if unit_id == '': raise AppException(500, "单位名称不正确") # 删除之前同一个部门的人员 db.query(EmergencyContactInfo).filter(and_(EmergencyContactInfo.del_flag == "0", EmergencyContactInfo.unit_id == unit_id, )).update({"del_flag": "2"}) new_contact = EmergencyContactInfo( unit_id=unit_id, unit_name=contact['单位名称'], contact_name = contact['联系人'], position = contact['职务'], yue_gov_ease_phone = contact['粤政易手机号码'], create_by = user_id ) # 添加到数据库会话 db.add(new_contact) # 提交 db.commit() # 返回创建成功的响应 return { "code": 200, "msg": "创建成功", "data": None } except AppException as e: return { "code": 500, "msg": e.msg } except Exception as e: traceback.print_exc() # 处理异常 raise HTTPException(status_code=500, detail=str(e))