__init__.py 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from fastapi import APIRouter, Request, Depends
  4. from fastapi.responses import JSONResponse
  5. from database import get_db
  6. from sqlalchemy.orm import Session
  7. from models import *
  8. import json
  9. router = APIRouter()
  10. @router.post('/create')
  11. async def mine(request: Request,db: Session = Depends(get_db)):
  12. data = await request.body()
  13. body = data.decode(encoding='utf-8')
  14. if len(body) > 0:
  15. body = json.loads(body)
  16. for param in ['jobName', 'groupName', 'enable', 'triggerType', 'readerPreSQL', 'readerSQL', 'readerPostSQL', 'writerPreSQL',
  17. 'writerPostSQL','columns','fromDbId','toDbId','toTableName','toTablePKColumns','writerBatchSize','defaultParamsMapJson','incrementColumns']:
  18. if param not in body :
  19. return JSONResponse(status_code=410, content={
  20. 'errcode': 410,
  21. 'errmsg': f'{param}为必填参数'
  22. })
  23. jobName = body['jobName']
  24. jobinfo = db.query(JobConfigEntity.jobName==jobName).first()
  25. if jobinfo is not None:
  26. return JSONResponse(status_code=410, content={
  27. 'code': 410,
  28. 'msg': f'任务名称{jobName}已存在'
  29. })
  30. groupName = body['groupName']
  31. enable = body['enable']
  32. if enable not in [0,1]:
  33. return JSONResponse(status_code=410, content={
  34. 'code': 410,
  35. 'msg': f'启用状态:{enable}超出值域,0关闭/1启动'
  36. })
  37. triggerType = body['triggerType']
  38. if triggerType not in [0,1]:
  39. return JSONResponse(status_code=410, content={
  40. 'code': 410,
  41. 'msg': f'更新机制:{triggerType}超出值域,1:手动触发;2:每时;3:每天;4:每周;5:每月;6:每年'
  42. })
  43. fromDbId = body['fromDbId']
  44. fromdatabase_info = db.query(DatasourceEntity).filter(DatasourceEntity.id == fromDbId).first()
  45. if fromdatabase_info is None:
  46. return {
  47. 'errcode': 1,
  48. 'errmsg': f'数据库{fromDbId}没配置'
  49. }
  50. readerPreSQL = body['readerPreSQL']
  51. readerSQL = body['readerSQL']
  52. readerPostSQL = body['readerPostSQL']
  53. toDbId = body['toDbId']
  54. todatabase_info = db.query(DatasourceEntity).filter(DatasourceEntity.id == toDbId).first()
  55. if todatabase_info is None:
  56. return {
  57. 'errcode': 1,
  58. 'errmsg': f'数据库{toDbId}没配置'
  59. }
  60. writerPreSQL = body['writerPreSQL']
  61. writerPostSQL = body['writerPostSQL']
  62. columns = body['columns']
  63. toTableName = body['toTableName']
  64. toTablePKColumns = body['toTablePKColumns']
  65. writerBatchSize = body['writerBatchSize']
  66. defaultParamsMapJson = body['defaultParamsMapJson']
  67. incrementColumns = body['incrementColumns']
  68. job = JobConfigEntity(jobName=jobName,groupName=groupName,enable=enable,triggerType=triggerType,fromDbId=fromDbId,readerPreSQL=readerPreSQL,readerSQL=readerSQL,
  69. readerPostSQL=readerPostSQL,toDbId=toDbId,writerPreSQL=writerPreSQL,writerPostSQL=writerPostSQL,columns=columns,toTableName=toTableName,
  70. toTablePKColumns=toTablePKColumns,writerBatchSize=writerBatchSize,defaultParamsMapJson=defaultParamsMapJson,incrementColumns=incrementColumns)
  71. db.add(job)
  72. db.commit()
  73. else:
  74. return None
  75. return {
  76. 'msg': '提交成功',
  77. 'code': 200
  78. }