position.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from fastapi import APIRouter, Request, Depends, Query, HTTPException, status
  4. from common.security import valid_access_token
  5. from fastapi.responses import JSONResponse
  6. from sqlalchemy.orm import Session
  7. from sqlalchemy import and_, or_
  8. from sqlalchemy.sql import func
  9. from common.auth_user import *
  10. from pydantic import BaseModel
  11. from database import get_db
  12. from typing import List
  13. from models import *
  14. from utils import *
  15. from utils.ry_system_util import *
  16. from utils.video_util import *
  17. import traceback
  18. import json
  19. router = APIRouter()
  20. @router.get('/list')
  21. async def get_dict_data_by_type(
  22. keywords:str =Query(None),
  23. page: int = Query(1, gt=0),
  24. pageSize: int = Query(10, gt=0),
  25. db: Session = Depends(get_db),
  26. body = Depends(remove_xss_json),
  27. user_id = Depends(valid_access_token)
  28. ):
  29. try:
  30. # 根据 dict_type 查询字典数据
  31. # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all()
  32. query = db.query(DutyPosition)
  33. query = query.filter(DutyPosition.del_flag != '2')
  34. # 添加查询条件
  35. if keywords:
  36. query = query.filter(DutyPosition.position_name.like(f'%{keywords}%'))
  37. # 计算总记录数
  38. total_count = query.count()
  39. # 计算分页
  40. offset = (page - 1) * pageSize
  41. position_data = query.offset(offset).limit(pageSize).all()
  42. # 转换为字典
  43. data_list = []
  44. for d in position_data:
  45. data_list.append({
  46. "id": d.id,
  47. "sort_number": d.sort_number,
  48. "position_name":d.position_name,
  49. "type": d.type,
  50. "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
  51. })
  52. # 构建返回结果
  53. result = {
  54. "total": total_count,
  55. "page": page,
  56. "pageSize": pageSize,
  57. "totalPages": (total_count + pageSize - 1) // pageSize,
  58. "data": data_list,
  59. "code": 200,
  60. "msg": "查询成功"
  61. }
  62. return result
  63. except Exception as e:
  64. # 处理异常
  65. traceback.print_exc()
  66. return JSONResponse(status_code=404, content={
  67. 'code': 404,
  68. 'msg': str(e)
  69. })
  70. @router.get('/info/{id}')
  71. async def get_dict_data_by_type(
  72. id: str ,
  73. db: Session = Depends(get_db),
  74. body = Depends(remove_xss_json),
  75. user_id = Depends(valid_access_token)
  76. ):
  77. try:
  78. # 根据 dict_type 查询字典数据
  79. # dict_data = dict_type_get_dict_data_info(db,'three_proofing')
  80. query = db.query(DutyPosition)
  81. # 添加查询条件
  82. # if dictType:
  83. query = query.filter(DutyPosition.id==id)
  84. query = query.filter(DutyPosition.del_flag != '2')
  85. d = query.first()
  86. data_list = {
  87. "id": d.id,
  88. "sort_number": d.sort_number,
  89. "position_name": d.position_name,
  90. "type": d.type,
  91. "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
  92. }
  93. # 构建返回结果
  94. result = {
  95. "data": data_list,
  96. "code": 200,
  97. "msg": "查询成功"
  98. }
  99. return result
  100. except Exception as e:
  101. # 处理异常
  102. traceback.print_exc()
  103. return JSONResponse(status_code=404, content={
  104. 'code': 404,
  105. 'msg': str(e)
  106. })
  107. @router.post('/create')
  108. async def create_dict_data(
  109. db: Session = Depends(get_db),
  110. body = Depends(remove_xss_json),
  111. user_id = Depends(valid_access_token)
  112. ):
  113. try:
  114. # 创建一个新的 SysDictData 实例
  115. new_position_data = DutyPosition(
  116. sort_number=body['sort_number'],
  117. position_name=body['position_name'],
  118. type=body['type'],
  119. create_by=user_id
  120. )
  121. # 添加到会话并提交
  122. db.add(new_position_data)
  123. db.commit()
  124. # 构建返回结果
  125. result = {
  126. "code": 200,
  127. "msg": "操作成功",
  128. "data": None
  129. }
  130. return result
  131. except Exception as e:
  132. # 处理异常
  133. traceback.print_exc()
  134. return JSONResponse(status_code=404, content={
  135. 'code': 404,
  136. 'msg': str(e)
  137. })
  138. @router.put("/update")
  139. async def updata_dict_type(
  140. db: Session = Depends(get_db),
  141. body = Depends(remove_xss_json),
  142. user_id = Depends(valid_access_token)
  143. ):
  144. try:
  145. # 从请求数据创建一个新的 SysDictType 实例
  146. query = db.query(DutyPosition)
  147. query = query.filter(DutyPosition.id == body['id'])
  148. query = query.filter(DutyPosition.del_flag != '2')
  149. # query = db.query(SysDictData).filter(SysDictData.dict_code == form_data.dictCode)
  150. # query = db.query(SysDictData).filter(SysDictData.del_flag != '2')
  151. position_data = query.first()
  152. if not position_data:
  153. return JSONResponse(status_code=404, content={
  154. 'errcode': 404,
  155. 'errmsg': '岗位不存在'
  156. })
  157. position_data.sort_number=body['sort_number']
  158. position_data.position_name = body['position_name']
  159. position_data.type = body['type']
  160. position_data.update_by = user_id
  161. # 添加到数据库会话并提交
  162. db.commit()
  163. # db.refresh(new_dict_type) # 可选,如果需要刷新实例状态
  164. # 构建并返回响应
  165. return {
  166. "code": 200,
  167. "msg": "操作成功",
  168. "data": None # 根据你的响应示例,data 为 null
  169. }
  170. except Exception as e:
  171. # 处理异常
  172. traceback.print_exc()
  173. return JSONResponse(status_code=404, content={
  174. 'code': 404,
  175. 'msg': str(e)
  176. })
  177. @router.delete("/delete/{id}") # 使用 ID 来标识要删除的接口
  178. async def delete_dict_data(
  179. id: str,
  180. db: Session = Depends(get_db),
  181. body = Depends(remove_xss_json),
  182. user_id = Depends(valid_access_token)
  183. ):
  184. try:
  185. # 从数据库中获取要删除的 OneShareApiEntity 实例
  186. query = db.query(DutyPosition)
  187. query = query.filter(DutyPosition.id == id)
  188. query = query.filter(DutyPosition.del_flag != '2')
  189. position_data = query.first()
  190. # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first()
  191. if not position_data:
  192. return JSONResponse(status_code=404, content={
  193. 'code': 404,
  194. 'msg': '值班岗位不存在'
  195. })
  196. position_data.del_flag = '2'
  197. # 删除实例
  198. # db.delete(api)
  199. db.commit()
  200. # 构建并返回响应
  201. return {
  202. "code": 200,
  203. "msg": "操作成功",
  204. "data": None
  205. }
  206. except Exception as e:
  207. traceback.print_exc()
  208. return JSONResponse(status_code=404, content={
  209. 'code': 404,
  210. 'msg': str(e)
  211. })