__init__.py 15 KB


  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from fastapi import APIRouter, Request, Depends, Query, HTTPException, status
  4. from pydantic import BaseModel
  5. from database import get_db
  6. from sqlalchemy.orm import Session
  7. from sqlalchemy import and_, or_
  8. from typing import List
  9. from models import *
  10. import json
  11. from sqlalchemy.sql import func
  12. router = APIRouter()
  13. @router.get('/type/list')
  14. async def get_dict_types(
  15. db: Session = Depends(get_db), # 假设 get_db 是获取数据库会话的依赖项
  16. pageNum: int = Query(1, gt=0),
  17. pageSize: int = Query(10, gt=0),
  18. dictName: str = Query(None, max_length=100),
  19. dictType: str = Query(None, max_length=100),
  20. beginTime: datetime = Query(None),
  21. endTime: datetime = Query(None)
  22. ):
  23. try:
  24. # 构建查询
  25. query = db.query(SysDictType)
  26. query = query.filter(SysDictType.del_flag!='2')
  27. # 添加查询条件
  28. if dictName:
  29. query = query.filter(SysDictType.dict_name.like(f'%{dictName}%'))
  30. if dictType:
  31. query = query.filter(SysDictType.dict_type.like(f'%{dictType}%'))
  32. if beginTime and endTime:
  33. query = query.filter(
  34. and_(
  35. SysDictType.create_time >= beginTime,
  36. SysDictType.create_time <= endTime
  37. )
  38. )
  39. # 计算总记录数
  40. total_count = query.count()
  41. # 计算分页
  42. offset = (pageNum - 1) * pageSize
  43. dict_types = query.offset(offset).limit(pageSize).all()
  44. # 转换为字典
  45. dict_type_dicts = [
  46. {
  47. "dictId": d.dict_id,
  48. "dictName": d.dict_name,
  49. "dictType": d.dict_type,
  50. "remark": d.remark,
  51. "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
  52. }
  53. for d in dict_types
  54. ]
  55. # 构建返回结果
  56. result = {
  57. "total": total_count,
  58. "rows": dict_type_dicts,
  59. "code": 200,
  60. "msg": "查询成功"
  61. }
  62. return result
  63. except Exception as e:
  64. # 处理异常
  65. raise HTTPException(status_code=500, detail=str(e))
  66. @router.get('/type/optionselect')
  67. async def get_dict_types_optionselect(
  68. db: Session = Depends(get_db), # 假设 get_db 是获取数据库会话的依赖项
  69. ):
  70. try:
  71. # 构建查询
  72. query = db.query(SysDictType)
  73. query = query.filter(SysDictType.del_flag!='2')
  74. # 计算总记录数
  75. dict_types = query.all()
  76. # 转换为字典
  77. dict_type_dicts = [
  78. {
  79. "dictId": d.dict_id,
  80. "dictName": d.dict_name,
  81. "dictType": d.dict_type,
  82. "remark": d.remark,
  83. "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
  84. }
  85. for d in dict_types
  86. ]
  87. # 构建返回结果
  88. result = {
  89. "data": dict_type_dicts,
  90. "code": 200,
  91. "msg": "查询成功"
  92. }
  93. return result
  94. except Exception as e:
  95. # 处理异常
  96. raise HTTPException(status_code=500, detail=str(e))
  97. @router.get('/type/{dictId}')
  98. async def get_dict_types_optionselect(
  99. dictId: str,
  100. db: Session = Depends(get_db), # 假设 get_db 是获取数据库会话的依赖项
  101. ):
  102. try:
  103. # 构建查询
  104. query = db.query(SysDictType)
  105. query = query.filter(SysDictType.dict_id==dictId)
  106. query = query.filter(SysDictType.del_flag!='2')
  107. dict_types = query.first()
  108. # 转换为字典
  109. dict_type_dicts ={
  110. "dictId": dict_types.dict_id,
  111. "dictName": dict_types.dict_name,
  112. "dictType": dict_types.dict_type,
  113. "remark": dict_types.remark,
  114. "createTime": dict_types.create_time.strftime('%Y-%m-%d %H:%M:%S') if dict_types.create_time else ''
  115. }
  116. # 构建返回结果
  117. result = {
  118. "data": dict_type_dicts,
  119. "code": 200,
  120. "msg": "查询成功"
  121. }
  122. return result
  123. except Exception as e:
  124. # 处理异常
  125. raise HTTPException(status_code=500, detail=str(e))
  126. class DictTypeCreateForm(BaseModel):
  127. dictName: str
  128. dictType: str
  129. remark: str = None
  130. @router.post("/type")
  131. async def create_dict_type(
  132. form_data: DictTypeCreateForm,
  133. db: Session = Depends(get_db)
  134. ):
  135. try:
  136. # 从请求数据创建一个新的 SysDictType 实例
  137. new_dict_type = SysDictType(
  138. dict_name=form_data.dictName,
  139. dict_type=form_data.dictType,
  140. remark=form_data.remark,
  141. create_time=datetime.now(), # 假设自动设置创建时间
  142. update_time=datetime.now() # 假设自动设置更新时间
  143. )
  144. # 添加到数据库会话并提交
  145. db.add(new_dict_type)
  146. db.commit()
  147. db.refresh(new_dict_type) # 可选,如果需要刷新实例状态
  148. # 构建并返回响应
  149. return {
  150. "code": 200,
  151. "msg": "操作成功",
  152. "data": None # 根据你的响应示例,data 为 null
  153. }
  154. except Exception as e:
  155. # 处理异常
  156. raise HTTPException(status_code=500, detail=str(e))
  157. class DictTypeUpdataForm(BaseModel):
  158. createTime : str
  159. dictId : str
  160. dictName: str
  161. dictType: str
  162. remark: str = None
  163. @router.put("/type")
  164. async def updata_dict_type(
  165. form_data: DictTypeUpdataForm,
  166. db: Session = Depends(get_db)
  167. ):
  168. try:
  169. # 从请求数据创建一个新的 SysDictType 实例
  170. dict_type = db.query(SysDictType).filter(SysDictType.dict_id == form_data.dictId and SysDictType.del_flag != '2').first()
  171. if not dict_type:
  172. detail = "dict_type不存在"
  173. raise HTTPException(status_code=404, detail="dict_type不存在")
  174. dict_type.dict_name = form_data.dictName
  175. dict_type.dict_name = form_data.dictType
  176. dict_type.remark = form_data.remark
  177. dict_type.update_time = datetime.now()
  178. # 添加到数据库会话并提交
  179. db.commit()
  180. # db.refresh(new_dict_type) # 可选,如果需要刷新实例状态
  181. # 构建并返回响应
  182. return {
  183. "code": 200,
  184. "msg": "操作成功",
  185. "data": None # 根据你的响应示例,data 为 null
  186. }
  187. except Exception as e:
  188. # 处理异常
  189. if str(e)=='':
  190. e=detail
  191. raise HTTPException(status_code=500, detail=str(e))
  192. @router.delete("/type/delete/{dictId}") # 使用 ID 来标识要删除的接口
  193. async def delete_dict_type(
  194. dictId: int,
  195. db: Session = Depends(get_db)
  196. ):
  197. try:
  198. # 从数据库中获取要删除的 OneShareApiEntity 实例
  199. dict_type = db.query(SysDictType).filter(SysDictType.dict_id == dictId and SysDictType.del_flag != '2').first()
  200. if not dict_type:
  201. detail = "dict_type不存在"
  202. raise HTTPException(status_code=404, detail="dict_type不存在")
  203. dict_type.del_flag = '2'
  204. # 删除实例
  205. # db.delete(api)
  206. db.commit()
  207. # 构建并返回响应
  208. return {
  209. "code": 200,
  210. "msg": "操作成功",
  211. "data": None
  212. }
  213. except Exception as e:
  214. # 处理异常
  215. if str(e)=='':
  216. e=detail
  217. raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
  218. @router.get('/data/list')
  219. async def get_dict_data_by_type(
  220. dictType: str = Query(None, max_length=100),
  221. pageNum: int = Query(1, gt=0),
  222. pageSize: int = Query(10, gt=0),
  223. db: Session = Depends(get_db),
  224. ):
  225. try:
  226. # 根据 dict_type 查询字典数据
  227. # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all()
  228. query = db.query(SysDictData)
  229. # 添加查询条件
  230. # if dictType:
  231. query = query.filter(SysDictData.dict_type.like(f'%{dictType}%'))
  232. query = query.filter(SysDictData.del_flag != '2')
  233. # 计算总记录数
  234. total_count = query.count()
  235. # 计算分页
  236. offset = (pageNum - 1) * pageSize
  237. dict_data = query.offset(offset).limit(pageSize).all()
  238. # 转换为字典
  239. dict_data_list = [
  240. {
  241. "dictCode": d.dict_code,
  242. "dictSort": d.dict_sort,
  243. "dictLabel": d.dict_label,
  244. "dictValue": d.dict_value,
  245. "dictType": d.dict_type,
  246. "cssClass": d.css_class,
  247. "listClass": d.list_class,
  248. "isDefault": d.is_default,
  249. "remark": d.remark,
  250. "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
  251. }
  252. for d in dict_data
  253. ]
  254. # 构建返回结果
  255. result = {
  256. "total": total_count,
  257. "rows": dict_data_list,
  258. "code": 200,
  259. "msg": "查询成功"
  260. }
  261. return result
  262. except Exception as e:
  263. # 处理异常
  264. raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
  265. class DictDataCreateForm(BaseModel):
  266. dictLabel:str
  267. dictValue:str
  268. cssClass:str
  269. listClass:str
  270. dictSort:int
  271. remark:str
  272. dictType:str
  273. @router.post('/data')
  274. async def create_dict_data(
  275. form_data: DictDataCreateForm,
  276. db: Session = Depends(get_db),
  277. ):
  278. try:
  279. # 创建一个新的 SysDictData 实例
  280. new_dict_data = SysDictData(
  281. dict_sort=form_data.dictSort,
  282. dict_label=form_data.dictLabel,
  283. dict_value=form_data.dictValue,
  284. dict_type=form_data.dictType,
  285. css_class=form_data.cssClass,
  286. list_class=form_data.listClass,
  287. remark=form_data.remark
  288. )
  289. # 添加到会话并提交
  290. db.add(new_dict_data)
  291. db.commit()
  292. db.refresh(new_dict_data) # 可选,如果需要刷新实例状态
  293. # 构建返回结果
  294. result = {
  295. "code": 200,
  296. "msg": "操作成功",
  297. "data": None
  298. }
  299. return result
  300. except Exception as e:
  301. # 处理异常
  302. raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
  303. @router.get('/data/type/{dict_type}')
  304. async def get_dict_data_by_type(
  305. dict_type: str,
  306. db: Session = Depends(get_db),
  307. ):
  308. try:
  309. # 根据 dict_type 查询字典数据
  310. query = db.query(SysDictData)
  311. query = query.filter(SysDictData.dict_type==dict_type)
  312. query = query.filter(SysDictData.del_flag != '2')
  313. # dict_data = db.query(SysDictData).filter_by(dict_type==dict_type and del_flag != '2').all()
  314. dict_data = query.all()
  315. # 将模型转换为字典
  316. dict_data_list = [
  317. {
  318. "dictCode": d.dict_code,
  319. "dictSort": d.dict_sort,
  320. "dictLabel": d.dict_label,
  321. "dictValue": d.dict_value,
  322. "dictType": d.dict_type,
  323. "cssClass": d.css_class,
  324. "listClass": d.list_class,
  325. "isDefault": d.is_default,
  326. "remark": d.remark,
  327. "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
  328. }
  329. for d in dict_data
  330. ]
  331. # 构建返回结果
  332. result = {
  333. "code": 200,
  334. "msg": "操作成功",
  335. "data": dict_data_list
  336. }
  337. return result
  338. except Exception as e:
  339. # 处理异常
  340. raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
  341. class DictDataUpdataForm(BaseModel):
  342. dictCode:str
  343. dictLabel:str= None
  344. dictValue:str= None
  345. cssClass:str= None
  346. listClass:str= None
  347. dictSort:int= None
  348. remark:str= None
  349. dictType:str = None
  350. isDefault:str = None
  351. @router.put("/data")
  352. async def updata_dict_type(
  353. form_data: DictDataUpdataForm,
  354. db: Session = Depends(get_db)
  355. ):
  356. try:
  357. # 从请求数据创建一个新的 SysDictType 实例
  358. query = db.query(SysDictData)
  359. query = query.filter(SysDictData.dict_code == form_data.dictCode)
  360. query = query.filter(SysDictData.del_flag != '2')
  361. # query = db.query(SysDictData).filter(SysDictData.dict_code == form_data.dictCode)
  362. # query = db.query(SysDictData).filter(SysDictData.del_flag != '2')
  363. dict_data = query.first()
  364. if not dict_data:
  365. detail = "dict_data不存在"
  366. raise HTTPException(status_code=404, detail="dict_data不存在")
  367. if form_data.dictSort:
  368. dict_data.dict_sort = form_data.dictSort
  369. if form_data.dictLabel:
  370. dict_data.dict_label = form_data.dictLabel
  371. if form_data.dictValue:
  372. dict_data.dict_value = form_data.dictValue
  373. if form_data.dictType:
  374. dict_data.dict_type = form_data.dictType
  375. if form_data.cssClass:
  376. dict_data.css_class = form_data.cssClass
  377. if form_data.listClass:
  378. dict_data.list_class = form_data.listClass
  379. if form_data.remark:
  380. dict_data.remark = form_data.remark
  381. if form_data.isDefault:
  382. dict_data.is_default = form_data.isDefault
  383. # 添加到数据库会话并提交
  384. db.commit()
  385. # db.refresh(new_dict_type) # 可选,如果需要刷新实例状态
  386. # 构建并返回响应
  387. return {
  388. "code": 200,
  389. "msg": "操作成功",
  390. "data": None # 根据你的响应示例,data 为 null
  391. }
  392. except Exception as e:
  393. # 处理异常
  394. if str(e)=='':
  395. e=detail
  396. raise HTTPException(status_code=500, detail=str(e))
  397. @router.delete("/data/delete/{dictCode}") # 使用 ID 来标识要删除的接口
  398. async def delete_dict_data(
  399. dictCode: str,
  400. db: Session = Depends(get_db)
  401. ):
  402. try:
  403. # 从数据库中获取要删除的 OneShareApiEntity 实例
  404. query = db.query(SysDictData)
  405. query = query.filter(SysDictData.dict_code == dictCode)
  406. query = query.filter(SysDictData.del_flag != '2')
  407. dict_data = query.first()
  408. # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first()
  409. if not dict_data:
  410. detail = "dict_data不存在"
  411. raise HTTPException(status_code=404, detail="dict_data不存在")
  412. dict_data.del_flag = '2'
  413. # 删除实例
  414. # db.delete(api)
  415. db.commit()
  416. # 构建并返回响应
  417. return {
  418. "code": 200,
  419. "msg": "操作成功",
  420. "data": None
  421. }
  422. except Exception as e:
  423. # 处理异常
  424. if str(e)=='':
  425. e=detail
  426. raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))