12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from fastapi import APIRouter, Request, Depends
- from fastapi.responses import JSONResponse
- from database import get_db
- from sqlalchemy.orm import Session
- from models import *
- import json
- router = APIRouter()
- @router.post('/create')
- async def mine(request: Request,db: Session = Depends(get_db)):
- data = await request.body()
- body = data.decode(encoding='utf-8')
- if len(body) > 0:
- body = json.loads(body)
- for param in ['jobName', 'groupName', 'enable', 'triggerType', 'readerPreSQL', 'readerSQL', 'readerPostSQL', 'writerPreSQL',
- 'writerPostSQL','columns','fromDbId','toDbId','toTableName','toTablePKColumns','writerBatchSize','defaultParamsMapJson','incrementColumns']:
- if param not in body :
- return JSONResponse(status_code=410, content={
- 'errcode': 410,
- 'errmsg': f'{param}为必填参数'
- })
- jobName = body['jobName']
- jobinfo = db.query(JobConfigEntity.jobName==jobName).first()
- if jobinfo is not None:
- return JSONResponse(status_code=410, content={
- 'code': 410,
- 'msg': f'任务名称{jobName}已存在'
- })
- groupName = body['groupName']
- enable = body['enable']
- if enable not in [0,1]:
- return JSONResponse(status_code=410, content={
- 'code': 410,
- 'msg': f'启用状态:{enable}超出值域,0关闭/1启动'
- })
- triggerType = body['triggerType']
- if triggerType not in [0,1]:
- return JSONResponse(status_code=410, content={
- 'code': 410,
- 'msg': f'更新机制:{triggerType}超出值域,1:手动触发;2:每时;3:每天;4:每周;5:每月;6:每年'
- })
- fromDbId = body['fromDbId']
- fromdatabase_info = db.query(DatasourceEntity).filter(DatasourceEntity.id == fromDbId).first()
- if fromdatabase_info is None:
- return {
- 'errcode': 1,
- 'errmsg': f'数据库{fromDbId}没配置'
- }
- readerPreSQL = body['readerPreSQL']
- readerSQL = body['readerSQL']
- readerPostSQL = body['readerPostSQL']
- toDbId = body['toDbId']
- todatabase_info = db.query(DatasourceEntity).filter(DatasourceEntity.id == toDbId).first()
- if todatabase_info is None:
- return {
- 'errcode': 1,
- 'errmsg': f'数据库{toDbId}没配置'
- }
- writerPreSQL = body['writerPreSQL']
- writerPostSQL = body['writerPostSQL']
- columns = body['columns']
- toTableName = body['toTableName']
- toTablePKColumns = body['toTablePKColumns']
- writerBatchSize = body['writerBatchSize']
- defaultParamsMapJson = body['defaultParamsMapJson']
- incrementColumns = body['incrementColumns']
- job = JobConfigEntity(jobName=jobName,groupName=groupName,enable=enable,triggerType=triggerType,fromDbId=fromDbId,readerPreSQL=readerPreSQL,readerSQL=readerSQL,
- readerPostSQL=readerPostSQL,toDbId=toDbId,writerPreSQL=writerPreSQL,writerPostSQL=writerPostSQL,columns=columns,toTableName=toTableName,
- toTablePKColumns=toTablePKColumns,writerBatchSize=writerBatchSize,defaultParamsMapJson=defaultParamsMapJson,incrementColumns=incrementColumns)
- db.add(job)
- db.commit()
- else:
- return None
- return {
- 'msg': '提交成功',
- 'code': 200
- }
|