#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends from fastapi.responses import JSONResponse from database import get_db from sqlalchemy.orm import Session from models import * import json router = APIRouter() @router.post('/create') async def mine(request: Request,db: Session = Depends(get_db)): data = await request.body() body = data.decode(encoding='utf-8') if len(body) > 0: body = json.loads(body) for param in ['jobName', 'groupName', 'enable', 'triggerType', 'readerPreSQL', 'readerSQL', 'readerPostSQL', 'writerPreSQL', 'writerPostSQL','columns','fromDbId','toDbId','toTableName','toTablePKColumns','writerBatchSize','defaultParamsMapJson','incrementColumns']: if param not in body : return JSONResponse(status_code=410, content={ 'errcode': 410, 'errmsg': f'{param}为必填参数' }) jobName = body['jobName'] jobinfo = db.query(JobConfigEntity.jobName==jobName).first() if jobinfo is not None: return JSONResponse(status_code=410, content={ 'code': 410, 'msg': f'任务名称{jobName}已存在' }) groupName = body['groupName'] enable = body['enable'] if enable not in [0,1]: return JSONResponse(status_code=410, content={ 'code': 410, 'msg': f'启用状态:{enable}超出值域,0关闭/1启动' }) triggerType = body['triggerType'] if triggerType not in [0,1]: return JSONResponse(status_code=410, content={ 'code': 410, 'msg': f'更新机制:{triggerType}超出值域,1:手动触发;2:每时;3:每天;4:每周;5:每月;6:每年' }) fromDbId = body['fromDbId'] fromdatabase_info = db.query(DatasourceEntity).filter(DatasourceEntity.id == fromDbId).first() if fromdatabase_info is None: return { 'errcode': 1, 'errmsg': f'数据库{fromDbId}没配置' } readerPreSQL = body['readerPreSQL'] readerSQL = body['readerSQL'] readerPostSQL = body['readerPostSQL'] toDbId = body['toDbId'] todatabase_info = db.query(DatasourceEntity).filter(DatasourceEntity.id == toDbId).first() if todatabase_info is None: return { 'errcode': 1, 'errmsg': f'数据库{toDbId}没配置' } writerPreSQL = body['writerPreSQL'] writerPostSQL = body['writerPostSQL'] columns = body['columns'] toTableName = body['toTableName'] toTablePKColumns = body['toTablePKColumns'] writerBatchSize = body['writerBatchSize'] defaultParamsMapJson = body['defaultParamsMapJson'] incrementColumns = body['incrementColumns'] job = JobConfigEntity(jobName=jobName,groupName=groupName,enable=enable,triggerType=triggerType,fromDbId=fromDbId,readerPreSQL=readerPreSQL,readerSQL=readerSQL, readerPostSQL=readerPostSQL,toDbId=toDbId,writerPreSQL=writerPreSQL,writerPostSQL=writerPostSQL,columns=columns,toTableName=toTableName, toTablePKColumns=toTablePKColumns,writerBatchSize=writerBatchSize,defaultParamsMapJson=defaultParamsMapJson,incrementColumns=incrementColumns) db.add(job) db.commit() else: return None return { 'msg': '提交成功', 'code': 200 }