#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends,Query, HTTPException, status,BackgroundTasks from common.security import valid_access_token from fastapi.responses import JSONResponse from utils.riskManagement_uitl import * from utils.ry_system_util import * from sqlalchemy.orm import Session from sqlalchemy import and_, or_ from pydantic import BaseModel from datetime import datetime from database import get_db from typing import List from models import * from utils import * import traceback import json router = APIRouter() @router.get('/company/list') async def get_company_list( area_code: str = Query(None, description='区域代码'), keyword :str = Query(None, description='企业名称或企业负责人'), page: int = Query(1, gt=0, description='页码'), pageSize: int = Query(10, gt=0, description='每页条目数量'), db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): try: # 构建查询 query = db.query(CompanyManagementBaseInfo) query = query.filter(CompanyManagementBaseInfo.del_flag != '2') # 应用查询条件 if area_code: query = query.filter(CompanyManagementBaseInfo.district_code == area_code) if keyword: query = query.filter(or_(CompanyManagementBaseInfo.company_name.like(f'%{keyword}%'), CompanyManagementBaseInfo.responsible_person.like(f'%{keyword}%'))) # 计算总条目数 total_items = query.count() # 排序 query = query.order_by(CompanyManagementBaseInfo.create_time.desc()) # 执行分页查询 Companys = query.offset((page - 1) * pageSize).limit(pageSize).all() # 将查询结果转换为列表形式的字典 Companys_list = [] for company in Companys: province = city_code_get_city_name(db, company.province_code) city = city_code_get_city_name(db, company.city_code) district = city_code_get_city_name(db, company.district_code) company_info = { "id": company.id, "company_name": company.company_name, "province": province, "province_code": company.province_code, "city": city, "city_code": company.city_code, "district": district, "district_code": company.district_code, "company_address": company.company_address, "responsible_person": company.responsible_person, "company_type": company.company_type, "phone":company.phone, "create_time": company.create_time.strftime('%Y-%m-%d') } Companys_list.append(company_info) # 返回结果 return { "code": 200, "msg": "成功", "data": Companys_list, "total": total_items, "page": page, "pageSize": pageSize, "totalPages": (total_items + pageSize - 1) // pageSize } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.get('/company/{id}') async def get_company( id: str , db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): try: # 构建查询 query = db.query(CompanyManagementBaseInfo) query = query.filter(CompanyManagementBaseInfo.del_flag != '2') # 应用查询条件 if id: query = query.filter(CompanyManagementBaseInfo.id == id) # 执行查询 company = query.first() if not company: detail = "数据不存在" return JSONResponse(status_code=404, content={"code":404,"msg":"数据不存在"}) province = city_code_get_city_name(db, company.province_code) city = city_code_get_city_name(db, company.city_code) district = city_code_get_city_name(db, company.district_code) company_result = { "id": company.id, "company_name": company.company_name, "province": province, "province_code": company.province_code, "city": city, "city_code": company.city_code, "district": district, "district_code": company.district_code, "company_address": company.company_address, "responsible_person": company.responsible_person, "company_type": company.company_type, "phone":company.phone, "create_time": company.create_time.strftime('%Y-%m-%d') } # 返回结果 return { "code": 200, "msg": "成功", "data": company_result } except Exception as e: # 处理异常 traceback.print_exc() if str(e)=='': e = detail raise HTTPException(status_code=500, detail=str(e)) @router.post('/company/create') async def create_company( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: new_company = CompanyManagementBaseInfo( id=new_guid(), company_name = body['company_name'], province = city_code_get_city_name(db,body['province_code']), province_code = body['province_code'], city = city_code_get_city_name(db,body['city_code']), city_code = body['city_code'], district = city_code_get_city_name(db,body['district_code']), district_code = body['district_code'], company_address = body['company_address'], responsible_person = body['responsible_person'], company_type =body['company_type'], phone = body['phone'], create_by = user_id ) # 添加到数据库会话并提交 db.add(new_company) db.commit() # 返回创建成功的响应 return { "code": 200, "msg": "成功", "data": None } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) def area_code_get_area_name(db,area_code): query = db.query(GovdataArea).filter_by(area_code=area_code).first() if query: return query.area_name return '' def city_code_get_city_name(db,area_code): query = db.query(GovdataCity).filter_by(id=area_code).first() if query: return query.name return '' @router.put('/company/update') async def update_company( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 提取请求数据 query = db.query(CompanyManagementBaseInfo) query = query.filter(CompanyManagementBaseInfo.id == body['id']) query = query.filter(CompanyManagementBaseInfo.del_flag != '2') company = query.first() if not company: return JSONResponse(status_code=404, content={"code": 404, "msg": "数据不存在"}) if 'company_name' in body: company.company_name = body['company_name'] # if 'province' in body: # company.province = body['province'] if 'province_code' in body: company.province_code = body['province_code'] company.province = city_code_get_city_name(db,body['province_code']) # if 'city' in body: # company.city = body['city'] if 'city_code' in body: company.city_code = body['city_code'] company.city = city_code_get_city_name(db,body['city_code']) # if 'district' in body: # company.district = body['district'] if 'district_code' in body: company.district_code = body['district_code'] company.district =city_code_get_city_name(db,body['district_code']) if 'company_address' in body: company.company_address = body['company_address'] if 'responsible_person' in body: company.responsible_person = body['responsible_person'] if 'company_type' in body: company.company_type = body['company_type'] if 'phone' in body: company.phone = body['phone'] company.update_by = user_id # 更新到数据库会话并提交 db.commit() db.refresh(company) # 可选,如果需要刷新实例状态 # 返回创建成功的响应 return { "code": 200, "msg": "成功", "data": None } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.delete('/company/delete') async def delete_companys( companyIds: list, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 提取请求数据 query = db.query(CompanyManagementBaseInfo) query = query.filter(CompanyManagementBaseInfo.del_flag != '2') query = query.filter(CompanyManagementBaseInfo.id.in_(companyIds)) companys = query.all() if not companys: return JSONResponse(status_code=404, content={"code": 404, "msg": "数据不存在"}) for company in companys: company.del_flag = '2' company.update_by=user_id # 更新到数据库会话并提交 db.commit() # 返回创建成功的响应 return { "code": 200, "msg": "删除成功", "data": None } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.delete('/company/delete/{companyId}') async def delete_company( companyId: str, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 提取请求数据 query = db.query(CompanyManagementBaseInfo) query = query.filter(CompanyManagementBaseInfo.del_flag != '2') query = query.filter(CompanyManagementBaseInfo.id==companyId) company = query.first() if not company: return JSONResponse(status_code=404, content={"code": 404, "msg": "数据不存在"}) company.del_flag = '2' company.update_by = user_id # 更新到数据库会话并提交 db.commit() db.refresh(company) # 可选,如果需要刷新实例状态 # 返回创建成功的响应 return { "code": 200, "msg": "删除成功", "data": None } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e))