123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from fastapi import APIRouter, Request, Depends,Query, HTTPException, status,BackgroundTasks
- from common.security import valid_access_token
- from fastapi.responses import JSONResponse
- from utils.riskManagement_uitl import *
- from utils.ry_system_util import *
- from sqlalchemy.orm import Session
- from sqlalchemy import and_, or_
- from pydantic import BaseModel
- from datetime import datetime
- from database import get_db
- from typing import List
- from models import *
- from utils import *
- import traceback
- import json
- router = APIRouter()
- @router.get('/company/list')
- async def get_company_list(
- area_code: str = Query(None, description='区域代码'),
- keyword :str = Query(None, description='企业名称或企业负责人'),
- page: int = Query(1, gt=0, description='页码'),
- pageSize: int = Query(10, gt=0, description='每页条目数量'),
- db: Session = Depends(get_db),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 构建查询
- query = db.query(CompanyManagementBaseInfo)
- query = query.filter(CompanyManagementBaseInfo.del_flag != '2')
- # 应用查询条件
- if area_code:
- query = query.filter(CompanyManagementBaseInfo.district_code == area_code)
- if keyword:
- query = query.filter(or_(CompanyManagementBaseInfo.company_name.like(f'%{keyword}%'),
- CompanyManagementBaseInfo.responsible_person.like(f'%{keyword}%')))
- # 计算总条目数
- total_items = query.count()
- # 排序
- query = query.order_by(CompanyManagementBaseInfo.create_time.desc())
- # 执行分页查询
- Companys = query.offset((page - 1) * pageSize).limit(pageSize).all()
- # 将查询结果转换为列表形式的字典
- Companys_list = []
- for company in Companys:
- company_info = {
- "id": company.id,
- "company_name": company.company_name,
- "province": company.province,
- "province_code": company.province_code,
- "city": company.province,
- "city_code": company.province_code,
- "district": company.province,
- "district_code": company.province_code,
- "company_address": company.company_address,
- "responsible_person": company.responsible_person,
- "company_type": company.company_type,
- "phone":company.phone,
- "create_time": company.create_time.strftime('%Y-%m-%d')
- }
- Companys_list.append(company_info)
- # 返回结果
- return {
- "code": 200,
- "msg": "成功",
- "data": Companys_list,
- "total": total_items,
- "page": page,
- "pageSize": pageSize,
- "totalPages": (total_items + pageSize - 1) // pageSize
- }
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=str(e))
- @router.get('/company/{id}')
- async def get_company(
- id: str ,
- db: Session = Depends(get_db),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 构建查询
- query = db.query(CompanyManagementBaseInfo)
- query = query.filter(CompanyManagementBaseInfo.del_flag != '2')
- # 应用查询条件
- if id:
- query = query.filter(CompanyManagementBaseInfo.id == id)
- # 执行查询
- company = query.first()
- if not company:
- detail = "数据不存在"
- return JSONResponse(status_code=404, content={"code":404,"msg":"数据不存在"})
- company_result = {
- "id": company.id,
- "company_name": company.company_name,
- "province": company.province,
- "province_code": company.province_code,
- "city": company.province,
- "city_code": company.province_code,
- "district": company.province,
- "district_code": company.province_code,
- "company_address": company.company_address,
- "responsible_person": company.responsible_person,
- "company_type": company.company_type,
- "phone":company.phone,
- "create_time": company.create_time.strftime('%Y-%m-%d')
- }
- # 返回结果
- return {
- "code": 200,
- "msg": "成功",
- "data": company_result
- }
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- if str(e)=='':
- e = detail
- raise HTTPException(status_code=500, detail=str(e))
- @router.post('/company/create')
- async def create_company(
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- new_company = CompanyManagementBaseInfo(
- id=new_guid(),
- company_name = body['company_name'],
- province = body['province'],
- province_code = body['province_code'],
- city = body['city'],
- city_code = body['city_code'],
- district = body['district'],
- district_code = body['district_code'],
- company_address = body['company_address'],
- responsible_person = body['responsible_person'],
- company_type =body['company_type'],
- phone = body['phone'],
- create_by = user_id
- )
- # 添加到数据库会话并提交
- db.add(new_company)
- db.commit()
- # 返回创建成功的响应
- return {
- "code": 200,
- "msg": "成功",
- "data": None
- }
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=str(e))
- @router.put('/company/update')
- async def update_company(
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 提取请求数据
- query = db.query(CompanyManagementBaseInfo)
- query = query.filter(CompanyManagementBaseInfo.id == body['id'])
- query = query.filter(CompanyManagementBaseInfo.del_flag != '2')
- company = query.first()
- if not company:
- return JSONResponse(status_code=404, content={"code": 404, "msg": "数据不存在"})
- if 'company_name' in body:
- company.company_name = body['company_name']
- if 'province' in body:
- company.province = body['province']
- if 'province_code' in body:
- company.province_code = body['province_code']
- if 'city' in body:
- company.city = body['city']
- if 'city_code' in body:
- company.city_code = body['city_code']
- if 'district' in body:
- company.district = body['district']
- if 'district_code' in body:
- company.district_code = body['district_code']
- if 'company_address' in body:
- company.company_address = body['company_address']
- if 'responsible_person' in body:
- company.responsible_person = body['responsible_person']
- if 'company_type' in body:
- company.company_type = body['company_type']
- if 'phone' in body:
- company.phone = body['phone']
- company.update_by = user_id
- # 更新到数据库会话并提交
- db.commit()
- db.refresh(company) # 可选,如果需要刷新实例状态
- # 返回创建成功的响应
- return {
- "code": 200,
- "msg": "成功",
- "data": None
- }
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=str(e))
- @router.delete('/company/delete')
- async def delete_companys(
- companyIds: list,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 提取请求数据
- query = db.query(CompanyManagementBaseInfo)
- query = query.filter(CompanyManagementBaseInfo.del_flag != '2')
- query = query.filter(CompanyManagementBaseInfo.id.in_(companyIds))
- companys = query.all()
- if not companys:
- return JSONResponse(status_code=404, content={"code": 404, "msg": "数据不存在"})
- for company in companys:
- company.del_flag = '2'
- company.update_by=user_id
- # 更新到数据库会话并提交
- db.commit()
- # 返回创建成功的响应
- return {
- "code": 200,
- "msg": "删除成功",
- "data": None
- }
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=str(e))
- @router.delete('/company/delete/{companyId}')
- async def delete_company(
- companyId: str,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 提取请求数据
- query = db.query(CompanyManagementBaseInfo)
- query = query.filter(CompanyManagementBaseInfo.del_flag != '2')
- query = query.filter(CompanyManagementBaseInfo.id==companyId)
- company = query.first()
- if not company:
- return JSONResponse(status_code=404, content={"code": 404, "msg": "数据不存在"})
- company.del_flag = '2'
- company.update_by = user_id
- # 更新到数据库会话并提交
- db.commit()
- db.refresh(company) # 可选,如果需要刷新实例状态
- # 返回创建成功的响应
- return {
- "code": 200,
- "msg": "删除成功",
- "data": None
- }
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=str(e))
|