Browse Source

250314-1代码。

baoyubo 2 months ago
parent
commit
bc4f2e47fc

+ 2 - 0
routers/api/videoResource/__init__.py

@@ -5,10 +5,12 @@ from .hkvideo import router as hkvideo_router
 from .videoinfo import router as videoinfo_router
 from .avcon import router as avcon_router
 from .wrjvideo import router as wrjvideo_router
+from .tag import router as tag_router
 
 router = APIRouter()
 
 router.include_router(hkvideo_router, prefix="/hkvideo")
 router.include_router(videoinfo_router, prefix="/videoinfo")
+router.include_router(tag_router, prefix="/tag")
 router.include_router(avcon_router, prefix="/avcon")
 router.include_router(wrjvideo_router, prefix="/wrjvideo")

+ 306 - 0
routers/api/videoResource/tag/__init__.py

@@ -0,0 +1,306 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+from fastapi import APIRouter, Request, Depends, Query, HTTPException, status
+from common.security import valid_access_token
+from fastapi.responses import JSONResponse
+from sqlalchemy.orm import Session
+from sqlalchemy import and_, or_
+from sqlalchemy.sql import func
+from common.auth_user import *
+from pydantic import BaseModel
+from database import get_db
+from typing import List
+from models import *
+from utils import *
+from utils.ry_system_util import *
+from utils.video_util import *
+import traceback
+import json
+
+router = APIRouter()
+
+
+@router.get('/list')
+async def get_dict_data_by_type(
+    # dictType: str = Query(None, max_length=100),
+    pageNum: int = Query(1, gt=0),
+    pageSize: int = Query(10, gt=0),
+    db: Session = Depends(get_db),
+    body = Depends(remove_xss_json),
+    user_id = Depends(valid_access_token)
+):
+    try:
+        # 根据 dict_type 查询字典数据
+        query = db.query(SysDictData)
+        query = query.filter(SysDictData.dict_type == 'video_type')
+        query = query.filter(SysDictData.del_flag != '2')
+        query = query.order_by(SysDictData.dict_sort)
+        # dict_data = db.query(SysDictData).filter_by(dict_type==dict_type and del_flag != '2').all()
+        dict_data = query.all()
+        # 将模型转换为字典
+        dict_data_list = [
+            {
+                "dictCode": d.dict_code,
+                "dictSort": d.dict_sort,
+                "dictLabel": d.dict_label,
+                "dictValue": d.dict_value,
+                "dictType": d.dict_type,
+                "cssClass": d.css_class,
+                "listClass": d.list_class,
+                "isDefault": d.is_default,
+                "remark": d.remark,
+                "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
+            }
+            for d in dict_data
+        ]
+
+        # 构建返回结果
+        result = {
+            "code": 200,
+            "msg": "操作成功",
+            "data": dict_data_list
+        }
+        return result
+    except Exception as e:
+        # 处理异常
+
+        traceback.print_exc()
+        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
+
+@router.get('/info/{dictCode}')
+async def get_dict_data_by_type(
+    dictCode: str ,
+    db: Session = Depends(get_db),
+    body = Depends(remove_xss_json),
+    user_id = Depends(valid_access_token)
+):
+    try:
+        # 根据 dict_type 查询字典数据
+        # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all()
+        query = db.query(SysDictData)
+
+        # 添加查询条件
+        # if dictType:
+        query = query.filter(SysDictData.dict_code==dictCode)
+        query = query.filter(SysDictData.del_flag != '2')
+
+
+        d = query.first()
+
+        # 转换为字典
+        dict_data_list = {
+                "dictCode": d.dict_code,
+                "dictSort": d.dict_sort,
+                "dictLabel": d.dict_label,
+                "dictValue": d.dict_value,
+                "dictType": d.dict_type,
+                "cssClass": d.css_class,
+                "listClass": d.list_class,
+                "isDefault": d.is_default,
+                "remark": d.remark,
+                "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
+            }
+
+        # 构建返回结果
+        result = {
+            "data": dict_data_list,
+            "code": 200,
+            "msg": "查询成功"
+        }
+        return result
+
+    except Exception as e:
+        # 处理异常
+        traceback.print_exc()
+        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
+
+
+
+@router.post('/add')
+async def create_dict_data(
+    db: Session = Depends(get_db),
+    body = Depends(remove_xss_json),
+    user_id = Depends(valid_access_token)
+):
+    try:
+        # 创建一个新的 SysDictData 实例
+        dict_label = body['dict_label']
+        dict_info = dict_label_get_dict_data_info(db, 'dict_type', dict_label)
+        if dict_info is None:
+            new_dict_data = SysDictData(
+                dict_label=dict_label,
+                dict_value=new_guid(),
+                dict_type='dict_type',
+                list_class='default',
+                create_by=user_id
+            )
+
+            # 添加到会话并提交
+            db.add(new_dict_data)
+            db.commit()
+            db.refresh(new_dict_data)  # 可选,如果需要刷新实例状态
+
+        # 构建返回结果
+        result = {
+            "code": 200,
+            "msg": "操作成功",
+            "data": None
+        }
+        return result
+    except Exception as e:
+        traceback.print_exc()
+        # 处理异常
+        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
+
+
+
+@router.put("/update")
+async def updata_dict_type(
+    db: Session = Depends(get_db),
+    body = Depends(remove_xss_json),
+    user_id = Depends(valid_access_token)
+):
+    try:
+        # 从请求数据创建一个新的 SysDictType 实例
+        query = db.query(SysDictData)
+        query = query.filter(SysDictData.dict_type == 'video_type')
+        query = query.filter(SysDictData.dict_code == body['dictCode'])
+        query = query.filter(SysDictData.del_flag != '2')
+
+        # query = db.query(SysDictData).filter(SysDictData.dict_code == form_data.dictCode)
+        # query = db.query(SysDictData).filter(SysDictData.del_flag != '2')
+
+        dict_data = query.first()
+        if not dict_data:
+            return JSONResponse(status_code=404, content={
+                'errcode': 404,
+                'errmsg': f"{body['dictCode']}不存在"
+            })
+            # detail = "dict_data不存在"
+            # raise HTTPException(status_code=404, detail="dict_data不存在")
+        if "dictSort" in body:
+            dict_data.dict_sort = body['dictSort']
+        if "dictLabel" in body:
+            dict_data.dict_label = body['dictLabel']
+        if "remark" in body:
+            dict_data.remark = body['remark']
+
+        # 添加到数据库会话并提交
+        db.commit()
+        # db.refresh(new_dict_type)  # 可选,如果需要刷新实例状态
+
+        # 构建并返回响应
+        return {
+            "code": 200,
+            "msg": "操作成功",
+            "data": None  # 根据你的响应示例,data 为 null
+        }
+    except Exception as e:
+        # 处理异常
+        traceback.print_exc()
+        # if str(e)=='':
+        #     e=detail
+        raise HTTPException(status_code=500, detail=str(e))
+
+@router.get('/isNull')
+async def get_video_tag_info(
+        dict_value:str = Query(None),
+        db: Session = Depends(get_db),
+        body=Depends(remove_xss_json),
+        # page: int = Query(1, gt=0, description='页码'),
+        # pageSize: int = Query(10, gt=0, description='每页条目数量'),
+        user_id=Depends(valid_access_token)
+):
+    try:
+        num = 0
+        if dict_value:
+            num = len(tag_get_video_tag_list(db,dict_value))
+        return {
+            "code": 200,
+            "msg": "成功",
+            "videonum":num,
+            "data": f'此标签已被{num}个摄像头使用,是否确认删除'
+        }
+    except Exception as e:
+        traceback.print_exc()
+        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
+
+@router.delete("/delete/{dictCode}")  # 使用 ID 来标识要删除的接口
+async def delete_dict_data(
+    dictCode: str,
+    db: Session = Depends(get_db),
+    body = Depends(remove_xss_json),
+    user_id = Depends(valid_access_token)
+):
+    try:
+        # 从数据库中获取要删除的 OneShareApiEntity 实例
+        query = db.query(SysDictData)
+        query = query.filter(SysDictData.dict_type == 'video_type')
+        query = query.filter(SysDictData.dict_code == dictCode)
+        query = query.filter(SysDictData.del_flag != '2')
+        dict_data = query.first()
+        # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first()
+
+        if not dict_data:
+            return JSONResponse(status_code=404, content={
+                'errcode': 404,
+                'errmsg': f'{dictCode}不存在'
+            })
+
+            # detail = "dict_data不存在"
+            # raise HTTPException(status_code=404, detail="dict_data不存在")
+        dict_data.del_flag = '2'
+        for i in get_video_tag_list(db,dict_data.dict_value):
+            i.del_flag = '2'
+        # 删除实例
+        # db.delete(api)
+        db.commit()
+
+        # 构建并返回响应
+        return {
+            "code": 200,
+            "msg": "操作成功",
+            "data": None
+        }
+    except Exception as e:
+        # 处理异常
+        traceback.print_exc()
+        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
+#
+# @router.delete("/delete/list/{dictCode}")  # 使用 ID 来标识要删除的接口
+# async def delete_dict_data(
+#     dictCode: str,
+#     db: Session = Depends(get_db),
+#     body = Depends(remove_xss_json),
+#     user_id = Depends(valid_access_token)
+# ):
+#     try:
+#         # 从数据库中获取要删除的 OneShareApiEntity 实例
+#         query = db.query(SysDictData)
+#         query = query.filter(SysDictData.del_flag != '2')
+#         dictCodeList = dictCode.split(',')
+#         query = query.filter(SysDictData.dict_code.in_(dictCodeList))
+#         dict_data = query.all()
+#         # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first()
+#
+#         if not dict_data:
+#             return JSONResponse(content={"msg":"dict不存在"},status_code=404)
+#             # detail = "dict不存在"
+#             # raise HTTPException(status_code=404, detail="dict不存在")
+#         for dict in dict_data:
+#             dict.del_flag = '2'
+#         # 删除实例
+#         # db.delete(api)
+#         db.commit()
+#
+#         # 构建并返回响应
+#         return {
+#             "code": 200,
+#             "msg": "操作成功",
+#             "data": None
+#         }
+#     except Exception as e:
+#         # 处理异常
+#         traceback.print_exc()
+#         raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))

+ 7 - 0
utils/video_util.py

@@ -5,4 +5,11 @@ def get_video_tag_list(db,video_code:str):
     query = query.filter(TpVideoTag.del_flag != '2')
     query = query.filter(TpVideoTag.video_code == video_code)
     query = query.order_by(TpVideoTag.video_code)
+    return query.all()
+
+def tag_get_video_tag_list(db,dict_value:str):
+    query = db.query(TpVideoTag)
+    query = query.filter(TpVideoTag.del_flag != '2')
+    query = query.filter(TpVideoTag.dict_value == dict_value)
+    query = query.order_by(TpVideoTag.video_code)
     return query.all()