|
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status
- from common.security import valid_access_token
- from fastapi.responses import JSONResponse
- from sqlalchemy.orm import Session
- from sqlalchemy import and_, or_
- from sqlalchemy.sql import func
- from common.auth_user import *
- from pydantic import BaseModel
- from database import get_db
- from typing import List
- from models import *
- from utils import *
- from utils.ry_system_util import *
- from utils.video_util import *
- import traceback
- import json
- router = APIRouter()
- def name_get_user_id(db,keywords):
- query = db.query(EmergencyContactUser)
- query = query.filter(EmergencyContactUser.del_flag != '2')
- query = query.filter(EmergencyContactUser.name.like(f"%{keywords}%"))
- data = query.all()
- return [info.id for info in data]
- def user_id_get_duty_id(db,user_id_list):
- query = db.query(DutyPersonnelArrangement)
- query = query.filter(DutyPersonnelArrangement.del_flag != '2')
- query = query.filter(DutyPersonnelArrangement.personnel_id.in_(user_id_list))
- data = query.all()
- return [info.duty_id for info in data]
- def duty_id_get_user_id(db,duty_id):
- query = db.query(DutyPersonnelArrangement)
- query = query.filter(DutyPersonnelArrangement.del_flag != '2')
- query = query.filter(DutyPersonnelArrangement.duty_id==duty_id)
- data = query.all()
- return data
- def user_id_get_info(db,user_id):
- query = db.query(EmergencyContactUser)
- query = query.filter(EmergencyContactUser.del_flag != '2')
- query = query.filter(EmergencyContactUser.id==user_id)
- data = query.first()
- return data
- def get_duty_unit_id_list(db):
- query = db.query(SysDictData)
- query = query.filter(SysDictData.del_flag != '2')
- query = query.filter(SysDictData.dict_type=='duty_unit')
- data = query.all()
- return [int(i.dict_value) for i in data]
- def dept_id_get_duty_unit_id(db,dept_id,duty_unit_id_list):
- if dept_id == 0:
- return 0
- if dept_id in duty_unit_id_list:
- return dept_id
- query = db.query(EmergencyContactDepartment)
- query = query.filter(EmergencyContactDepartment.del_flag != '2')
- query = query.filter(EmergencyContactDepartment.id==dept_id)
- data = query.first()
- if data:
- dept_id_get_duty_unit_id(db, data.parent_department_id, duty_unit_id_list)
- else:
- return 0
- def mobile_phone_get_dept_id(db,mobile_phone,duty_unit_id_list):
- query = db.query(EmergencyContactUser)
- query = query.filter(EmergencyContactUser.del_flag != '2')
- query = query.filter(EmergencyContactUser.mobile_phone==mobile_phone)
- data = query.first()
- if data:
- dept_id = data.department_id
- dept_id = dept_id_get_duty_unit_id(db, dept_id, duty_unit_id_list)
- return dept_id
- else:
- return 0
- @router.get('/list')
- async def get_dict_data_by_type(
- keywords:str =Query(None),
- duty_type:str =Query(None),
- start_time:str =Query(None),
- end_time:str =Query(None),
- duty_unit:int =Query(None),
- page: int = Query(1, gt=0),
- pageSize: int = Query(10, gt=0),
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 根据 dict_type 查询字典数据
- # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all()
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.del_flag != '2')
- # 添加查询条件
- if keywords:
- user_list= name_get_user_id(db,keywords)
- duty_list = user_id_get_duty_id(db,user_list)
- query = query.filter(DutySchedule.id.in_(duty_list))
- if duty_type:
- query = query.filter(DutySchedule.duty_type==duty_type)
- if start_time:
- start_time = datetime.strptime(start_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date>=start_time)
- if end_time:
- end_time = datetime.strptime(end_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date<=end_time)
- if duty_unit:
- query = query.filter(DutySchedule.duty_unit==duty_unit)
- # 计算总记录数
- total_count = query.count()
- # 计算分页
- offset = (page - 1) * pageSize
- query = query.order_by(DutySchedule.update_time.desc())
- duty_data = query.offset(offset).limit(pageSize).all()
- # 转换为字典
- data_list = []
- for d in duty_data:
- user_data = []
- user_list = duty_id_get_user_id(db,d.id)
- for user_info in user_list:
- contact_info = user_id_get_info(db, user_info.personnel_id)
- user_data.append({"position_id":user_info.position_id,
- "personnel_id":user_info.personnel_id,
- "name":contact_info.name,
- "position":contact_info.position,
- "mobile_phone":contact_info.mobile_phone,
- "office_phone":contact_info.office_phone,
- "department_id":contact_info.department_id,
- "yzy_userid":contact_info.userid})
- data_list.append({
- "id": d.id,
- "start_time": d.start_time.strftime('%Y-%m-%d %H:%M:%S') if d.start_time else '',
- "end_time":d.end_time.strftime('%Y-%m-%d %H:%M:%S') if d.end_time else '',
- "duty_date": d.duty_date,
- "shift_type":d.shift_type,
- "duty_unit": d.duty_unit,
- "duty_type": d.duty_type,
- "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '',
- "user_data":user_data
- })
- # 构建返回结果
- result = {
- "total": total_count,
- "page": page,
- "pageSize": pageSize,
- "totalPages": (total_count + pageSize - 1) // pageSize,
- "data": data_list,
- "code": 200,
- "msg": "查询成功"
- }
- return result
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.get('/latest_by_group')
- async def get_dict_data_by_type(
- keywords:str =Query(None),
- start_time:str =Query(None),
- end_time:str =Query(None),
- duty_unit:int =Query(None),
- page: int = Query(1, gt=0),
- pageSize: int = Query(10, gt=0),
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 根据 dict_type 查询字典数据
- # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all()
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.del_flag != '2')
- # 添加查询条件
- if keywords:
- user_list= name_get_user_id(db,keywords)
- duty_list = user_id_get_duty_id(db,user_list)
- query = query.filter(DutySchedule.id.in_(duty_list))
- if start_time:
- start_time = datetime.strptime(start_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date>=start_time)
- if end_time:
- end_time = datetime.strptime(end_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date<=end_time)
- if duty_unit:
- query = query.filter(DutySchedule.duty_unit==duty_unit)
- else:
- now_user_info=user_id_get_user_info(db,user_id)
- print(user_id,now_user_info.phonenumber,get_duty_unit_id_list(db))
- if now_user_info:
- query = query.filter(DutySchedule.duty_unit==mobile_phone_get_dept_id(db,mpfun.dec_data(now_user_info.phonenumber),get_duty_unit_id_list(db)))
- else:
- return {
- "total": 0,
- "page": page,
- "pageSize": pageSize,
- "totalPages": (0 + pageSize - 1) // pageSize,
- "data": [],
- "code": 200,
- "msg": "查询成功"
- }
- subquery = db.query(
- DutySchedule.duty_date,
- DutySchedule.shift_type,
- func.max(DutySchedule.update_time).label("latest_create_time")
- ).filter(DutySchedule.del_flag != '2').group_by(
- DutySchedule.duty_date,
- DutySchedule.shift_type
- ).subquery()
- query = query.join(
- subquery,
- and_(
- DutySchedule.duty_date == subquery.c.duty_date,
- DutySchedule.shift_type == subquery.c.shift_type,
- DutySchedule.update_time == subquery.c.latest_create_time
- )
- )
- # 计算总记录数
- total_count = query.count()
- # 计算分页
- offset = (page - 1) * pageSize
- query = query.order_by(DutySchedule.update_time.desc())
- print(query)
- duty_data = query.offset(offset).limit(pageSize).all()
- # 转换为字典
- data_list = []
- for d in duty_data:
- user_data = []
- user_list = duty_id_get_user_id(db,d.id)
- for user_info in user_list:
- contact_info = user_id_get_info(db, user_info.personnel_id)
- user_data.append({"position_id":user_info.position_id,
- "personnel_id":user_info.personnel_id,
- "name":contact_info.name,
- "position":contact_info.position,
- "mobile_phone":contact_info.mobile_phone,
- "office_phone":contact_info.office_phone,
- "department_id":contact_info.department_id,
- "yzy_userid":contact_info.userid})
- data_list.append({
- "id": d.id,
- "start_time": d.start_time.strftime('%Y-%m-%d %H:%M:%S') if d.start_time else '',
- "end_time":d.end_time.strftime('%Y-%m-%d %H:%M:%S') if d.end_time else '',
- "duty_date": d.duty_date,
- "shift_type":d.shift_type,
- "duty_unit": d.duty_unit,
- "duty_type": d.duty_type,
- "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '',
- "user_data":user_data
- })
- # 构建返回结果
- result = {
- "total": total_count,
- "page": page,
- "pageSize": pageSize,
- "totalPages": (total_count + pageSize - 1) // pageSize,
- "data": data_list,
- "code": 200,
- "msg": "查询成功"
- }
- return result
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.get('/info/{id}')
- async def get_dict_data_by_type(
- id: str ,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 根据 dict_type 查询字典数据
- # dict_data = dict_type_get_dict_data_info(db,'three_proofing')
- query = db.query(DutySchedule)
- # 添加查询条件
- # if dictType:
- query = query.filter(DutySchedule.id==id)
- query = query.filter(DutySchedule.del_flag != '2')
- d = query.first()
- user_data = []
- user_list = duty_id_get_user_id(db, d.id)
- for user_info in user_list:
- contact_info = user_id_get_info(db, user_info.personnel_id)
- user_data.append({"position_id": user_info.position_id,
- "personnel_id": user_info.personnel_id,
- "name": contact_info.name,
- "position": contact_info.position,
- "mobile_phone": contact_info.mobile_phone,
- "office_phone": contact_info.office_phone,
- "department_id": contact_info.department_id,
- "yzy_userid": contact_info.userid})
- data_list={
- "id": d.id,
- "start_time": d.start_time,
- "end_time": d.end_time,
- "duty_date": d.duty_date,
- "shift_type": d.shift_type,
- "duty_unit": d.duty_unit,
- "duty_type": d.duty_type,
- "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '',
- "user_data": user_data
- }
- # 构建返回结果
- result = {
- "data": data_list,
- "code": 200,
- "msg": "查询成功"
- }
- return result
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.post('/create')
- async def create_dict_data(
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 创建一个新的 SysDictData 实例
- user_data = body['user_data']
- if len(user_data)==0:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': '值班人员不能为空'
- })
- new_duty_data = DutySchedule(
- start_time=body['start_time'],
- end_time=body['end_time'],
- duty_date=body['duty_date'],
- shift_type=body['shift_type'],
- duty_unit=body['duty_unit'],
- duty_type=body['duty_type'],
- create_by=user_id
- )
- # 添加到会话并提交
- db.add(new_duty_data)
- db.commit()
- db.refresh(new_duty_data)
- user_list = []
- for user_info in user_data:
- user_list.append( DutyPersonnelArrangement(
- duty_id=new_duty_data.id,
- position_id=user_info['position_id'],
- personnel_id=user_info['personnel_id'],
- create_by=user_id
- ))
- db.add_all(user_list)
- db.commit()
- # 构建返回结果
- result = {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- return result
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.post('/list_create')
- async def create_dict_data(
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 创建一个新的 SysDictData 实例
- if isinstance(body,list) == False:
- return JSONResponse(status_code=500,content={'msg':"请求体非列表","code":500})
- for data in body:
- user_data = data['user_data']
- if len(user_data)==0:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': '值班人员不能为空'
- })
- new_duty_data = DutySchedule(
- start_time=data['start_time'],
- end_time=data['end_time'],
- duty_date=data['duty_date'],
- shift_type=data['shift_type'],
- duty_unit=data['duty_unit'],
- duty_type=data['duty_type'],
- create_by=user_id
- )
- # 添加到会话并提交
- db.add(new_duty_data)
- db.commit()
- db.refresh(new_duty_data)
- user_list = []
- for user_info in user_data:
- user_list.append( DutyPersonnelArrangement(
- duty_id=new_duty_data.id,
- position_id=user_info['position_id'],
- personnel_id=user_info['personnel_id'],
- create_by=user_id
- ))
- db.add_all(user_list)
- db.commit()
- # 构建返回结果
- result = {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- return result
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.put("/update")
- async def updata_dict_type(
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 从请求数据创建一个新的 SysDictType 实例
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.id == body['id'])
- query = query.filter(DutySchedule.del_flag != '2')
- # query = db.query(SysDictData).filter(SysDictData.dict_code == form_data.dictCode)
- # query = db.query(SysDictData).filter(SysDictData.del_flag != '2')
- duty_data = query.first()
- old_user_list = duty_id_get_user_id(db,duty_data.id)
- for info in old_user_list:
- info.del_flag = '2'
- db.commit()
- user_data = body['user_data']
- if len(user_data)==0:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': '值班人员不能为空'
- })
- user_list = []
- for user_info in user_data:
- user_list.append(DutyPersonnelArrangement(
- duty_id=duty_data.id,
- position_id=user_info['position_id'],
- personnel_id=user_info['personnel_id'],
- create_by=user_id
- ))
- db.add_all(user_list)
- if not dutyn_data:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': '值班不存在'
- })
- duty_data.start_time=body['start_time']
- duty_data.end_time = body['end_time']
- duty_data.duty_date = body['duty_date']
- duty_data.shift_type=body['shift_type']
- duty_data.duty_unit = body['duty_unit']
- duty_data.duty_type = body['duty_type']
- duty_data.update_by = user_id
- # 添加到数据库会话并提交
- db.commit()
- # db.refresh(new_dict_type) # 可选,如果需要刷新实例状态
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None # 根据你的响应示例,data 为 null
- }
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.delete("/delete/{id}") # 使用 ID 来标识要删除的接口
- async def delete_dict_data(
- id: str,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 从数据库中获取要删除的 OneShareApiEntity 实例
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.id == id)
- query = query.filter(DutySchedule.del_flag != '2')
- position_data = query.first()
- # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first()
- if not position_data:
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': '值班不存在'
- })
- position_data.del_flag = '2'
- # 删除实例
- # db.delete(api)
- db.commit()
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.delete("/delete_list/{id_list}") # 使用 ID 来标识要删除的接口
- async def delete_dict_data(
- id_list: str,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 从数据库中获取要删除的 OneShareApiEntity 实例
- id_list = [int(i) for i in id_list.split(',')]
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.id.in_(id_list))
- query = query.filter(DutySchedule.del_flag != '2')
- position_data = query.all()
- # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first()
- if not position_data:
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': '值班不存在'
- })
- for info in position_data:
- info.del_flag = '2'
- # 删除实例
- # db.delete(api)
- db.commit()
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
|