baoyubo 1 month ago
parent
commit
f13332a447
2 changed files with 248 additions and 1 deletions
  1. 3 1
      routers/api/dutyManagement/__init__.py
  2. 245 0
      routers/api/dutyManagement/position.py

+ 3 - 1
routers/api/dutyManagement/__init__.py

@@ -1,8 +1,10 @@
 from fastapi import APIRouter
 from . import duty
 from . import contact
+from . import position
 
 router = APIRouter()
 
 router.include_router(duty.router, prefix="/duty")
-router.include_router(contact.router, prefix="/contact")
+router.include_router(contact.router, prefix="/contact")
+router.include_router(position.router, prefix="/position")

+ 245 - 0
routers/api/dutyManagement/position.py

@@ -0,0 +1,245 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+from fastapi import APIRouter, Request, Depends, Query, HTTPException, status
+from common.security import valid_access_token
+from fastapi.responses import JSONResponse
+from sqlalchemy.orm import Session
+from sqlalchemy import and_, or_
+from sqlalchemy.sql import func
+from common.auth_user import *
+from pydantic import BaseModel
+from database import get_db
+from typing import List
+from models import *
+from utils import *
+from utils.ry_system_util import *
+from utils.video_util import *
+import traceback
+import json
+
+router = APIRouter()
+
+
+
+
+@router.get('/list')
+async def get_dict_data_by_type(
+    keywords:str =Query(None),
+    page: int = Query(1, gt=0),
+    pageSize: int = Query(10, gt=0),
+    db: Session = Depends(get_db),
+    body = Depends(remove_xss_json),
+    user_id = Depends(valid_access_token)
+):
+    try:
+        # 根据 dict_type 查询字典数据
+        # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all()
+        query = db.query(DutyPosition)
+
+        query = query.filter(DutyPosition.del_flag != '2')
+        # 添加查询条件
+        if keyword:
+            query = query.filter(DutyPosition.position_name.like(f'%{keywords}%'))
+
+        # 计算总记录数
+        total_count = query.count()
+
+        # 计算分页
+        offset = (page - 1) * pageSize
+        position_data = query.offset(offset).limit(pageSize).all()
+
+        # 转换为字典
+        data_list = []
+        for d in position_data:
+            data_list.append({
+                "id": d.id,
+                "sort_number": d.sort_number,
+                "position_name":d.position_name,
+                "type": d.type,
+                "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
+            })
+        # 构建返回结果
+        result = {
+            "total": total_count,
+            "page": page,
+            "pageSize": pageSize,
+            "totalPages": (total_count + pageSize - 1) // pageSize,
+            "data": data_list,
+            "code": 200,
+            "msg": "查询成功"
+        }
+        return result
+
+    except Exception as e:
+        # 处理异常
+        traceback.print_exc()
+        return JSONResponse(status_code=404, content={
+            'code': 404,
+            'msg': str(e)
+        })
+
+
+@router.get('/info/{id}')
+async def get_dict_data_by_type(
+    id: str ,
+    db: Session = Depends(get_db),
+    body = Depends(remove_xss_json),
+    user_id = Depends(valid_access_token)
+):
+    try:
+        # 根据 dict_type 查询字典数据
+        # dict_data = dict_type_get_dict_data_info(db,'three_proofing')
+        query = db.query(DutyPosition)
+
+        # 添加查询条件
+        # if dictType:
+        query = query.filter(DutyPosition.id==id)
+        query = query.filter(DutyPosition.del_flag != '2')
+
+
+        d = query.first()
+        data_list = {
+            "id": d.id,
+            "sort_number": d.sort_number,
+            "position_name": d.position_name,
+            "type": d.type,
+            "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
+        }
+        # 构建返回结果
+        result = {
+            "data": data_list,
+            "code": 200,
+            "msg": "查询成功"
+        }
+        return result
+
+    except Exception as e:
+        # 处理异常
+        traceback.print_exc()
+        return JSONResponse(status_code=404, content={
+            'code': 404,
+            'msg': str(e)
+        })
+
+
+
+@router.post('/create')
+async def create_dict_data(
+    db: Session = Depends(get_db),
+    body = Depends(remove_xss_json),
+    user_id = Depends(valid_access_token)
+):
+    try:
+        # 创建一个新的 SysDictData 实例
+        new_position_data = DutyPosition(
+            sort_number=body['sort_number'],
+            position_name=body['position_name'],
+            type=body['type'],
+            create_by=user_id
+        )
+
+        # 添加到会话并提交
+        db.add(new_position_data)
+        db.commit()
+
+        # 构建返回结果
+        result = {
+            "code": 200,
+            "msg": "操作成功",
+            "data": None
+        }
+        return result
+    except Exception as e:
+        # 处理异常
+        traceback.print_exc()
+        return JSONResponse(status_code=404, content={
+            'code': 404,
+            'msg': str(e)
+        })
+
+
+@router.put("/update")
+async def updata_dict_type(
+    db: Session = Depends(get_db),
+    body = Depends(remove_xss_json),
+    user_id = Depends(valid_access_token)
+):
+    try:
+        # 从请求数据创建一个新的 SysDictType 实例
+        query = db.query(DutyPosition)
+        query = query.filter(DutyPosition.id == body['id'])
+        query = query.filter(DutyPosition.del_flag != '2')
+
+        # query = db.query(SysDictData).filter(SysDictData.dict_code == form_data.dictCode)
+        # query = db.query(SysDictData).filter(SysDictData.del_flag != '2')
+
+        position_data = query.first()
+        if not position_data:
+            return JSONResponse(status_code=404, content={
+                'errcode': 404,
+                'errmsg': '岗位不存在'
+            })
+        position_data.sort_number=body['sort_number']
+        position_data.position_name = body['position_name']
+        position_data.type = body['type']
+        position_data.update_by = user_id
+        # 添加到数据库会话并提交
+        db.commit()
+        # db.refresh(new_dict_type)  # 可选,如果需要刷新实例状态
+
+        # 构建并返回响应
+        return {
+            "code": 200,
+            "msg": "操作成功",
+            "data": None  # 根据你的响应示例,data 为 null
+        }
+    except Exception as e:
+        # 处理异常
+        traceback.print_exc()
+        return JSONResponse(status_code=404, content={
+            'code': 404,
+            'msg': str(e)
+        })
+
+
+
+@router.delete("/delete/{id}")  # 使用 ID 来标识要删除的接口
+async def delete_dict_data(
+    id: str,
+    db: Session = Depends(get_db),
+    body = Depends(remove_xss_json),
+    user_id = Depends(valid_access_token)
+):
+    try:
+        # 从数据库中获取要删除的 OneShareApiEntity 实例
+        query = db.query(DutyPosition)
+        query = query.filter(DutyPosition.id == id)
+        query = query.filter(DutyPosition.del_flag != '2')
+        position_data = query.first()
+        # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first()
+
+        if not position_data:
+            return JSONResponse(status_code=404, content={
+                'code': 404,
+                'msg': '值班岗位不存在'
+            })
+        position_data.del_flag = '2'
+        # 删除实例
+        # db.delete(api)
+        db.commit()
+
+        # 构建并返回响应
+        return {
+            "code": 200,
+            "msg": "操作成功",
+            "data": None
+        }
+    except Exception as e:
+
+        traceback.print_exc()
+        return JSONResponse(status_code=404, content={
+            'code': 404,
+            'msg': str(e)
+        })
+