__init__.py 17 KB


  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from fastapi import APIRouter, Request, Depends, Query, HTTPException, status
  4. from common.security import valid_access_token
  5. from pydantic import BaseModel
  6. from database import get_db
  7. from sqlalchemy.orm import Session
  8. from sqlalchemy import and_, or_
  9. from typing import List
  10. from models import *
  11. from utils import *
  12. from common.auth_user import *
  13. import json
  14. from sqlalchemy.sql import func
  15. router = APIRouter()
  16. @router.get('/type/list')
  17. async def get_dict_types(
  18. db: Session = Depends(get_db), # 假设 get_db 是获取数据库会话的依赖项
  19. pageNum: int = Query(1, gt=0),
  20. pageSize: int = Query(10, gt=0),
  21. dictName: str = Query(None, max_length=100),
  22. dictType: str = Query(None, max_length=100),
  23. beginTime: datetime = Query(None),
  24. endTime: datetime = Query(None)
  25. ):
  26. try:
  27. # 构建查询
  28. query = db.query(SysDictType)
  29. query = query.filter(SysDictType.del_flag!='2')
  30. # 添加查询条件
  31. if dictName:
  32. query = query.filter(SysDictType.dict_name.like(f'%{dictName}%'))
  33. if dictType:
  34. query = query.filter(SysDictType.dict_type.like(f'%{dictType}%'))
  35. if beginTime and endTime:
  36. query = query.filter(
  37. and_(
  38. SysDictType.create_time >= beginTime,
  39. SysDictType.create_time <= endTime
  40. )
  41. )
  42. # 计算总记录数
  43. total_count = query.count()
  44. # 计算分页
  45. offset = (pageNum - 1) * pageSize
  46. dict_types = query.offset(offset).limit(pageSize).all()
  47. # 转换为字典
  48. dict_type_dicts = [
  49. {
  50. "dictId": d.dict_id,
  51. "dictName": d.dict_name,
  52. "dictType": d.dict_type,
  53. "remark": d.remark,
  54. "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
  55. }
  56. for d in dict_types
  57. ]
  58. # 构建返回结果
  59. result = {
  60. "total": total_count,
  61. "rows": dict_type_dicts,
  62. "code": 200,
  63. "msg": "查询成功"
  64. }
  65. return result
  66. except Exception as e:
  67. # 处理异常
  68. raise HTTPException(status_code=500, detail=str(e))
  69. @router.get('/type/optionselect')
  70. async def get_dict_types_optionselect(
  71. db: Session = Depends(get_db), # 假设 get_db 是获取数据库会话的依赖项
  72. ):
  73. try:
  74. # 构建查询
  75. query = db.query(SysDictType)
  76. query = query.filter(SysDictType.del_flag!='2')
  77. # 计算总记录数
  78. dict_types = query.all()
  79. # 转换为字典
  80. dict_type_dicts = [
  81. {
  82. "dictId": d.dict_id,
  83. "dictName": d.dict_name,
  84. "dictType": d.dict_type,
  85. "remark": d.remark,
  86. "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
  87. }
  88. for d in dict_types
  89. ]
  90. # 构建返回结果
  91. result = {
  92. "data": dict_type_dicts,
  93. "code": 200,
  94. "msg": "查询成功"
  95. }
  96. return result
  97. except Exception as e:
  98. # 处理异常
  99. raise HTTPException(status_code=500, detail=str(e))
  100. @router.get('/type/{dictId}')
  101. async def get_dict_types_optionselect(
  102. dictId: str,
  103. db: Session = Depends(get_db), # 假设 get_db 是获取数据库会话的依赖项
  104. body = Depends(remove_xss_json),
  105. user: AuthUser = Depends(get_auth_user)
  106. ):
  107. try:
  108. # 构建查询
  109. query = db.query(SysDictType)
  110. query = query.filter(SysDictType.dict_id==dictId)
  111. query = query.filter(SysDictType.del_flag!='2')
  112. dict_types = query.first()
  113. # 转换为字典
  114. dict_type_dicts ={
  115. "dictId": dict_types.dict_id,
  116. "dictName": dict_types.dict_name,
  117. "dictType": dict_types.dict_type,
  118. "remark": dict_types.remark,
  119. "createTime": dict_types.create_time.strftime('%Y-%m-%d %H:%M:%S') if dict_types.create_time else ''
  120. }
  121. # 构建返回结果
  122. result = {
  123. "data": dict_type_dicts,
  124. "code": 200,
  125. "msg": "查询成功"
  126. }
  127. return result
  128. except Exception as e:
  129. # 处理异常
  130. raise HTTPException(status_code=500, detail=str(e))
  131. class DictTypeCreateForm(BaseModel):
  132. dictName: str
  133. dictType: str
  134. remark: str = None
  135. @router.post("/type")
  136. async def create_dict_type(
  137. form_data: DictTypeCreateForm,
  138. db: Session = Depends(get_db),
  139. body = Depends(remove_xss_json),
  140. user_id = Depends(valid_access_token)
  141. ):
  142. try:
  143. # 从请求数据创建一个新的 SysDictType 实例
  144. new_dict_type = SysDictType(
  145. dict_name=form_data.dictName,
  146. dict_type=form_data.dictType,
  147. remark=form_data.remark,
  148. create_time=datetime.now(), # 假设自动设置创建时间
  149. update_time=datetime.now() # 假设自动设置更新时间
  150. )
  151. # 添加到数据库会话并提交
  152. db.add(new_dict_type)
  153. db.commit()
  154. db.refresh(new_dict_type) # 可选,如果需要刷新实例状态
  155. # 构建并返回响应
  156. return {
  157. "code": 200,
  158. "msg": "操作成功",
  159. "data": None # 根据你的响应示例,data 为 null
  160. }
  161. except Exception as e:
  162. # 处理异常
  163. raise HTTPException(status_code=500, detail=str(e))
  164. class DictTypeUpdataForm(BaseModel):
  165. createTime : str
  166. dictId : str
  167. dictName: str
  168. dictType: str
  169. remark: str = None
  170. @router.put("/type")
  171. async def updata_dict_type(
  172. form_data: DictTypeUpdataForm,
  173. db: Session = Depends(get_db),
  174. body = Depends(remove_xss_json),
  175. user_id = Depends(valid_access_token)
  176. ):
  177. try:
  178. # 从请求数据创建一个新的 SysDictType 实例
  179. dict_type = db.query(SysDictType).filter(SysDictType.dict_id == form_data.dictId and SysDictType.del_flag != '2').first()
  180. if not dict_type:
  181. detail = "dict_type不存在"
  182. raise HTTPException(status_code=404, detail="dict_type不存在")
  183. dict_type.dict_name = form_data.dictName
  184. dict_type.dict_type = form_data.dictType
  185. dict_type.remark = form_data.remark
  186. dict_type.update_time = datetime.now()
  187. # 添加到数据库会话并提交
  188. db.commit()
  189. # db.refresh(new_dict_type) # 可选,如果需要刷新实例状态
  190. # 构建并返回响应
  191. return {
  192. "code": 200,
  193. "msg": "操作成功",
  194. "data": None # 根据你的响应示例,data 为 null
  195. }
  196. except Exception as e:
  197. # 处理异常
  198. if str(e)=='':
  199. e=detail
  200. raise HTTPException(status_code=500, detail=str(e))
  201. @router.delete("/type/delete/{dictId}") # 使用 ID 来标识要删除的接口
  202. async def delete_dict_type(
  203. dictId: int,
  204. db: Session = Depends(get_db),
  205. body = Depends(remove_xss_json),
  206. user_id = Depends(valid_access_token)
  207. ):
  208. try:
  209. # 从数据库中获取要删除的 OneShareApiEntity 实例
  210. dict_type = db.query(SysDictType).filter(SysDictType.dict_id == dictId and SysDictType.del_flag != '2').first()
  211. if not dict_type:
  212. detail = "dict_type不存在"
  213. raise HTTPException(status_code=404, detail="dict_type不存在")
  214. dict_type.del_flag = '2'
  215. # 删除实例
  216. # db.delete(api)
  217. db.commit()
  218. # 构建并返回响应
  219. return {
  220. "code": 200,
  221. "msg": "操作成功",
  222. "data": None
  223. }
  224. except Exception as e:
  225. # 处理异常
  226. if str(e)=='':
  227. e=detail
  228. raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
  229. @router.get('/data/list')
  230. async def get_dict_data_by_type(
  231. dictType: str = Query(None, max_length=100),
  232. pageNum: int = Query(1, gt=0),
  233. pageSize: int = Query(10, gt=0),
  234. db: Session = Depends(get_db),
  235. body = Depends(remove_xss_json),
  236. user_id = Depends(valid_access_token)
  237. ):
  238. try:
  239. # 根据 dict_type 查询字典数据
  240. # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all()
  241. query = db.query(SysDictData)
  242. # 添加查询条件
  243. # if dictType:
  244. query = query.filter(SysDictData.dict_type.like(f'%{dictType}%'))
  245. query = query.filter(SysDictData.del_flag != '2')
  246. # 计算总记录数
  247. total_count = query.count()
  248. # 计算分页
  249. offset = (pageNum - 1) * pageSize
  250. dict_data = query.offset(offset).limit(pageSize).all()
  251. # 转换为字典
  252. dict_data_list = [
  253. {
  254. "dictCode": d.dict_code,
  255. "dictSort": d.dict_sort,
  256. "dictLabel": d.dict_label,
  257. "dictValue": d.dict_value,
  258. "dictType": d.dict_type,
  259. "cssClass": d.css_class,
  260. "listClass": d.list_class,
  261. "isDefault": d.is_default,
  262. "remark": d.remark,
  263. "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
  264. }
  265. for d in dict_data
  266. ]
  267. # 构建返回结果
  268. result = {
  269. "total": total_count,
  270. "rows": dict_data_list,
  271. "code": 200,
  272. "msg": "查询成功"
  273. }
  274. return result
  275. except Exception as e:
  276. # 处理异常
  277. raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
  278. @router.get('/data/{dictCode}')
  279. async def get_dict_data_by_type(
  280. dictCode: str ,
  281. db: Session = Depends(get_db),
  282. body = Depends(remove_xss_json),
  283. user_id = Depends(valid_access_token)
  284. ):
  285. try:
  286. # 根据 dict_type 查询字典数据
  287. # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all()
  288. query = db.query(SysDictData)
  289. # 添加查询条件
  290. # if dictType:
  291. query = query.filter(SysDictData.dict_code==dictCode)
  292. query = query.filter(SysDictData.del_flag != '2')
  293. d = query.first()
  294. # 转换为字典
  295. dict_data_list = {
  296. "dictCode": d.dict_code,
  297. "dictSort": d.dict_sort,
  298. "dictLabel": d.dict_label,
  299. "dictValue": d.dict_value,
  300. "dictType": d.dict_type,
  301. "cssClass": d.css_class,
  302. "listClass": d.list_class,
  303. "isDefault": d.is_default,
  304. "remark": d.remark,
  305. "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
  306. }
  307. # 构建返回结果
  308. result = {
  309. "data": dict_data_list,
  310. "code": 200,
  311. "msg": "查询成功"
  312. }
  313. return result
  314. except Exception as e:
  315. # 处理异常
  316. raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
  317. class DictDataCreateForm(BaseModel):
  318. dictLabel:str
  319. dictValue:str
  320. cssClass:str
  321. listClass:str
  322. dictSort:int
  323. remark:str
  324. dictType:str
  325. @router.post('/data')
  326. async def create_dict_data(
  327. form_data: DictDataCreateForm,
  328. db: Session = Depends(get_db),
  329. body = Depends(remove_xss_json),
  330. user_id = Depends(valid_access_token)
  331. ):
  332. try:
  333. # 创建一个新的 SysDictData 实例
  334. new_dict_data = SysDictData(
  335. dict_sort=form_data.dictSort,
  336. dict_label=form_data.dictLabel,
  337. dict_value=form_data.dictValue,
  338. dict_type=form_data.dictType,
  339. css_class=form_data.cssClass,
  340. list_class=form_data.listClass,
  341. remark=form_data.remark
  342. )
  343. # 添加到会话并提交
  344. db.add(new_dict_data)
  345. db.commit()
  346. db.refresh(new_dict_data) # 可选,如果需要刷新实例状态
  347. # 构建返回结果
  348. result = {
  349. "code": 200,
  350. "msg": "操作成功",
  351. "data": None
  352. }
  353. return result
  354. except Exception as e:
  355. # 处理异常
  356. raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
  357. @router.get('/data/type/{dict_type}')
  358. async def get_dict_data_by_type(
  359. dict_type: str,
  360. db: Session = Depends(get_db),
  361. body = Depends(remove_xss_json),
  362. user_id = Depends(valid_access_token)
  363. ):
  364. try:
  365. # 根据 dict_type 查询字典数据
  366. query = db.query(SysDictData)
  367. query = query.filter(SysDictData.dict_type==dict_type)
  368. query = query.filter(SysDictData.del_flag != '2')
  369. # dict_data = db.query(SysDictData).filter_by(dict_type==dict_type and del_flag != '2').all()
  370. dict_data = query.all()
  371. # 将模型转换为字典
  372. dict_data_list = [
  373. {
  374. "dictCode": d.dict_code,
  375. "dictSort": d.dict_sort,
  376. "dictLabel": d.dict_label,
  377. "dictValue": d.dict_value,
  378. "dictType": d.dict_type,
  379. "cssClass": d.css_class,
  380. "listClass": d.list_class,
  381. "isDefault": d.is_default,
  382. "remark": d.remark,
  383. "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
  384. }
  385. for d in dict_data
  386. ]
  387. # 构建返回结果
  388. result = {
  389. "code": 200,
  390. "msg": "操作成功",
  391. "data": dict_data_list
  392. }
  393. return result
  394. except Exception as e:
  395. # 处理异常
  396. raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
  397. class DictDataUpdataForm(BaseModel):
  398. dictCode:str
  399. dictLabel:str= None
  400. dictValue:str= None
  401. cssClass:str= None
  402. listClass:str= None
  403. dictSort:int= None
  404. remark:str= None
  405. dictType:str = None
  406. isDefault:str = None
  407. @router.put("/data")
  408. async def updata_dict_type(
  409. form_data: DictDataUpdataForm,
  410. db: Session = Depends(get_db),
  411. body = Depends(remove_xss_json),
  412. user_id = Depends(valid_access_token)
  413. ):
  414. try:
  415. # 从请求数据创建一个新的 SysDictType 实例
  416. query = db.query(SysDictData)
  417. query = query.filter(SysDictData.dict_code == form_data.dictCode)
  418. query = query.filter(SysDictData.del_flag != '2')
  419. # query = db.query(SysDictData).filter(SysDictData.dict_code == form_data.dictCode)
  420. # query = db.query(SysDictData).filter(SysDictData.del_flag != '2')
  421. dict_data = query.first()
  422. if not dict_data:
  423. detail = "dict_data不存在"
  424. raise HTTPException(status_code=404, detail="dict_data不存在")
  425. if form_data.dictSort:
  426. dict_data.dict_sort = form_data.dictSort
  427. if form_data.dictLabel:
  428. dict_data.dict_label = form_data.dictLabel
  429. if form_data.dictValue:
  430. dict_data.dict_value = form_data.dictValue
  431. if form_data.dictType:
  432. dict_data.dict_type = form_data.dictType
  433. if form_data.cssClass:
  434. dict_data.css_class = form_data.cssClass
  435. if form_data.listClass:
  436. dict_data.list_class = form_data.listClass
  437. if form_data.remark:
  438. dict_data.remark = form_data.remark
  439. if form_data.isDefault:
  440. dict_data.is_default = form_data.isDefault
  441. # 添加到数据库会话并提交
  442. db.commit()
  443. # db.refresh(new_dict_type) # 可选,如果需要刷新实例状态
  444. # 构建并返回响应
  445. return {
  446. "code": 200,
  447. "msg": "操作成功",
  448. "data": None # 根据你的响应示例,data 为 null
  449. }
  450. except Exception as e:
  451. # 处理异常
  452. if str(e)=='':
  453. e=detail
  454. raise HTTPException(status_code=500, detail=str(e))
  455. @router.delete("/data/delete/{dictCode}") # 使用 ID 来标识要删除的接口
  456. async def delete_dict_data(
  457. dictCode: str,
  458. db: Session = Depends(get_db),
  459. body = Depends(remove_xss_json),
  460. user_id = Depends(valid_access_token)
  461. ):
  462. try:
  463. # 从数据库中获取要删除的 OneShareApiEntity 实例
  464. query = db.query(SysDictData)
  465. query = query.filter(SysDictData.dict_code == dictCode)
  466. query = query.filter(SysDictData.del_flag != '2')
  467. dict_data = query.first()
  468. # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first()
  469. if not dict_data:
  470. detail = "dict_data不存在"
  471. raise HTTPException(status_code=404, detail="dict_data不存在")
  472. dict_data.del_flag = '2'
  473. # 删除实例
  474. # db.delete(api)
  475. db.commit()
  476. # 构建并返回响应
  477. return {
  478. "code": 200,
  479. "msg": "操作成功",
  480. "data": None
  481. }
  482. except Exception as e:
  483. # 处理异常
  484. if str(e)=='':
  485. e=detail
  486. raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))