123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from re import S
- from fastapi import APIRouter, Request, Depends, Form, Body, File, UploadFile, BackgroundTasks
- from fastapi.responses import Response
- from fastapi.responses import JSONResponse
- from database import get_db
- from sqlalchemy.orm import Session
- from utils import *
- from models import *
- from urllib import parse
- from common.DBgetdata import dbgetdata
- from pprint import pprint
- from datetime import datetime
- # import pandas as pd
- from pydantic import BaseModel
- import sqlalchemy
- import pymysql
- import json,time,io
- import time, os
- import math
- import uuid
- import re
- from . import sign_api
- router = APIRouter()
- router.include_router(sign_api.router)
- def contains_special_characters(input_string, special_characters=";|&|$|#|'|@| "):
- """
- 判断字符串是否包含特殊符号。
- :param input_string: 需要检查的字符串
- :param special_characters: 特殊符号的字符串,多个符号用竖线 '|' 分隔
- :return: 如果包含特殊符号返回 True,否则返回 False
- """
- # 创建正则表达式模式
- pattern = re.compile('[' + re.escape(special_characters) + ']')
- # 搜索字符串中的特殊符号
- if pattern.search(input_string):
- return True
- return False
- @router.get('/v1/demo')
- @router.post('/v1/demo')
- async def test1(request: Request):
- data = await request.body()
- body = data.decode(encoding='utf-8')
- # print(body)
- if len(body) > 0:
- body = json.loads(body)
- print(body)
- print([body,{'msg':'good'}])
- return body
- else:
- return body
- @router.post('/create_api_service')
- async def create_api_service(
- request: Request,
- db: Session = Depends(get_db)
- ):
- # print(1)
- data = await request.body()
- body = data.decode(encoding='utf-8')
- if len(body) > 0:
- # print(body)
- body = json.loads(body)
- serviceid = str(uuid.uuid4()).replace("-","")
- serviname = body['servicename']
- datasource = str(uuid.uuid1())
- sqlname = body['sqlname']
- id = str(uuid.uuid4())
- dbtype = body['dbtype']
- if body['dbtype'] == 'pymysql':
- host = body['host']
- user = body['user']
- password = body['password']
- database = body['database']
- port = body['port']
- charset = body['charset']
- create_datasource = DatasourceEntity(id= datasource,name=sqlname,scope=serviceid,dbtype=dbtype,host=host,user=user,password=password,database=database,port=port,charset=charset)
- # db.add()
- # print()
- db.add(create_datasource)
- sqltext = body['sqltext']
- create_command = CommandEntity(id=id,sqltext=sqltext,datasource=datasource,scope=serviceid)
- create_api = ApiServiceEntity(id=serviceid,name=serviname)
- db.add(create_api)
- db.add(create_command)
- db.commit()
- return serviceid
- @router.get('/v1/{service_code:path}')
- @router.post('/v1/{service_code:path}')
- async def v1(request: Request, background_tasks: BackgroundTasks, db: Session = Depends(get_db)):
- service_code = request.path_params.get('service_code')
- query_list = parse.parse_qsl(str(request.query_params))
- print(query_list)
- data = await request.body()
- body = data.decode(encoding='utf-8')
- if len(body) > 0:
- print('[body]', body)
- body = json.loads(body)
- # print("1111",body)
- service_info = db.query(ApiServiceEntity).filter(ApiServiceEntity.id == service_code).first()
- if service_info is None:
- return {
- 'errcode': 1,
- 'errmsg': '服务不存在'
- }
- # print(service_info)
- print('service_name', service_info.name)
- # print('database_name', database_info.name, database_info.dsn)
- command_info = db.query(CommandEntity).filter(CommandEntity.scope == service_code).first()
- if command_info is None:
- return {
- 'errcode': 1,
- 'errmsg': '查询命令没配置'
- }
- database_info = db.query(DatasourceEntity).filter(DatasourceEntity.id == command_info.datasource).first()
- if database_info is None:
- return {
- 'errcode': 1,
- 'errmsg': '数据库没配置'
- }
- sql = command_info.sqltext
- print('sql ==== << ', sql)
- meta_data = {}
- # 从query获取参数
- for (key, val) in query_list:
- if key != 'access_token' and key !='page' and key != 'limit' and key !='create_time' and key != 'record_name'and key != 'record_cid':
- sql = sql.replace("{" + key + "}", val.replace('全部',''))
- elif key =='create_time':
- if val != '全部':
- sql = sql.replace("{" + key + "}", ('DATE_FORMAT(create_time,\'%Y-%m-%d\') =\''+val+'\' and')) #DATE_FORMAT(create_time,'%Y-%m-%d') ='{create_time}'
- else:
- sql = sql.replace("{" + key + "}", '')
- elif key =='record_name' or key =='record_cid' :
- if val != '全部':
- sql = sql.replace("{" + key + "}", (key+'=\''+val+'\' and'))
- else:
- sql = sql.replace("{" + key + "}", '')
- # elif key =='record_cid':
- # if val != '全部':
- # sql = sql.replace("{" + key + "}", ('record_name =\''+val+'\' and'))
- # else:
- # print(111)
- # sql = sql.replace("{" + key + "}", '')
- elif key =='page' :
- meta_data['page'] = int(val)
- elif key == 'limit':
- meta_data['limit'] = int(val)
- # 从json body获取参数
- if len(body) > 0:
- print(body)
- for (key, val) in body.items():
- if key == 'page1' and 'limit1' in body:
- if val==-1:
- pass
- if val == '':
- val = '0'
- page1 = int(val)
- limit1=int(body['limit1'])
- print(str((page1-1)*limit1))
- sql = sql.replace('{page1}',str((page1-1)*limit1))
- # elif key == 'limit1' and 'page1' in body:
- # page1 = int(body['page1'])
- # limit1 = int(val)
- # sql.replace('{page1}', str((page1-1)*limit1))
- elif key !='page' and key != 'limit' and key !='create_time':
- if isinstance(val, int):
- val = str(val)
- if isinstance(val, float):
- val = str(val)
- sql = sql.replace("{" + key + "}", val)
- elif key =='create_time':
- if val != '全部':
- sql = sql.replace("{" + key + "}", ('DATE_FORMAT(create_time,\'%Y-%m-%d\') =\''+val+'\' and')) #DATE_FORMAT(create_time,'%Y-%m-%d') ='{create_time}'
- else:
- sql = sql.replace("{" + key + "}", '')
- elif key =='page' :
- if isinstance(val, str):
- meta_data['page'] = int(val)
- elif isinstance(val, int):
- meta_data['page'] = val
- elif key == 'limit':
- if isinstance(val, str):
- meta_data['limit'] = int(val)
- elif isinstance(val, int):
- meta_data['limit'] = val
- elif key == 'taskNo' and val=='':
- print("他们又输入空的taskNo啦!")
- return [{"text":"兄弟,taskNo不能为空"}]
- print('sql ==== >> ', sql)
- data = []
- if database_info.dbtype == 'psycopg2':
- '''
- print(1111)
- conn = psycopg2.connect(database_info.dsn)
- cur = conn.cursor()
- cur.execute(sql)
- rows = cur.fetchall()
- # 字段名列表
- colnames = [desc[0] for desc in cur.description]
- item = {}
- for row in rows:
- for col in range(len(colnames)):
- field_name = colnames[col]
- item[field_name] = row[col]
- data.append(item)
- conn.close()
- '''
-
- elif database_info.dbtype == 'pymysql':
- # print(database_info)
- conn = pymysql.connect(host=database_info.host,
- user=database_info.user,
- password=database_info.password,
- database=database_info.database,
- port=database_info.port,
- charset=database_info.charset)
- cur = conn.cursor()
- cur.execute(sql)
- rows = cur.fetchall()
- # 字段名列表
- colnames = [desc[0] for desc in cur.description]
- # print(colnames)
- # print(rows)
- pages = 1 #总页数
- current = 1 #第几页
- total = len(rows)
- size = len(rows)
- # print(size)
- if 'page' in meta_data and 'limit' in meta_data:
- current = meta_data['page']
- size = meta_data['limit']
- if (current == 0 or size == 0) and total != 0:
- current = 1
- size = 5
- pages = total//size
- if total%size !=0:
- pages+=1
-
- start_index = (current-1)*size
- end_index = current*size
- if pages <= current :
- # current = pages
- if total == size :
- end_index = (current-1)*size+total
- elif total%size == 0:
- end_index = current*size
- else:
- end_index = (current-1)*size+total%size
- start_index = (current-1)*size
- if total ==0:
- start_index = end_index =0
- # print(start_index,end_index)
- for row in range(start_index,end_index):
- item = {}
- for col in range(len(colnames)):
- field_name = colnames[col]
- item[field_name] = rows[row][col]
- data.append(item)
- # print(item)
- # for row in rows:
- # item = {}
- # for col in range(len(colnames)):
- # field_name = colnames[col]
- # item[field_name] = row[col]
- # data.append(item)
- # print(item)
- # print(data)
- conn.close()
- return {
- 'code': 0,
- 'errcode': 0,
- 'errmsg': '查询成功',
- 'data': {"list":data,
- 'pages':pages, # 总页数
- 'currentPage':current, #当前页数
- # 'current':current,
- # 'total' : total,
- 'total' : total, # 总数据量
- # 'size':size,
- 'pageSize':size #页码
- }
- }
- # import time
- # print(time)
- # if 'ispages' in body:
- # try:
- # if body['ispages']=='1':
- # return {
- # 'page': [{"total": pages,
- # "page": current}],
- # "data":data
- # }
- # # data['pages'] =
- # except:
- # pass
- #
- # #background_tasks.add_task(post_service_method, service_code, body, db)
- #
- # return data
- class CreateApiServiceFrom(BaseModel):
- datasource:str
- name:str
- sqltext:str
- serviceid:str=None
- @router.post('/v2/create_api_service')
- async def create_api_service_v2(
- form_data: CreateApiServiceFrom,
- request: Request,
- db: Session = Depends(get_db)
- ):
- # print(1)
- data = await request.body()
- body = data.decode(encoding='utf-8')
- if len(body) > 0:
- # print(body)
- body = json.loads(body)
- for param in ['datasource','name','sqltext']:
- if param not in body or body[param]=='':
- return f'{param}为必填参数,不能不存在'
- # 数据库信息校验
- datasource = form_data.datasource
- database_info = db.query(DatasourceEntity).filter(DatasourceEntity.id == datasource).first()
- if database_info is None:
- return JSONResponse(status_code=410, content={
- 'errcode': 410,
- 'errmsg': f'数据库-{datasource}-不存在'
- })
- # 接口信息生成及获取
- if form_data.serviceid is None:
- serviceid = str(uuid.uuid4()).replace("-","")
- else:
- serviceid = form_data.serviceid
- serviname = form_data.name
- id = str(uuid.uuid4())
- sqltext = form_data.sqltext
- # 接口执行校验
- if database_info.dbtype == 'pymysql':
- try:
- conn = pymysql.connect(host=database_info.host,
- user=database_info.user,
- password=database_info.password,
- database=database_info.database,
- port=database_info.port,
- charset=database_info.charset)
- cur = conn.cursor()
- # 测试返回
- sql = sqltext+' limit 0;'
- print(sql)
- cur.execute(sql)
- colnames = [desc[0] for desc in cur.description]
- cur.close()
- conn.close()
- except Exception as e:
- # 捕获其他所有类型的异常
- return f"未知错误: {e}"
- else:
- return '暂不支持除pymysql以外的驱动'
- create_command = CommandEntity(id=id,sqltext=sqltext,datasource=datasource,scope=serviceid)
- create_api = ApiServiceEntity(id=serviceid,name=serviname,status=1)
- db.add(create_api)
- db.add(create_command)
- db.commit()
- return [serviceid,colnames]
- @router.post('/v2/update_api_service')
- async def update_api_service_v2(
- request: Request,
- db: Session = Depends(get_db)
- ):
- # print(1)
- data = await request.body()
- body = data.decode(encoding='utf-8')
- if len(body) > 0:
- # print(body)
- body = json.loads(body)
- for param in ['id','datasource','scope','name','sqltext','status']:
- if param not in body or body[param]=='':
- return f'{param}为必填参数,不能不存在'
- # 数据库信息校验
- datasource = body['datasource']
- database_info = db.query(DatasourceEntity).filter(DatasourceEntity.id == datasource).first()
- if database_info is None:
- return JSONResponse(status_code=410, content={
- 'errcode': 410,
- 'errmsg': f'数据库-{datasource}-不存在'
- })
- serviceid = body['scope']
- # 判断请求接口是否存在
- service_info = db.query(ApiServiceEntity).filter(ApiServiceEntity.id == serviceid).first()
- if service_info is None:
- return JSONResponse(status_code=410, content={
- 'code': 410,
- 'msg': f'servicecode{serviceid}服务不存在'
- })
- command_info = db.query(CommandEntity).filter(CommandEntity.scope == serviceid).first()
- if command_info is None:
- return JSONResponse(status_code=410, content={
- 'code': 410,
- 'msg': f'servicecode{serviceid}服务不存在'
- })
- # 接口信息生成及获取
- id = body['id']
- serviname = body['name']
- sqltext = body['sqltext']
- status = body['status']
- # 接口执行校验
- if database_info.dbtype == 'pymysql':
- try:
- conn = pymysql.connect(host=database_info.host,
- user=database_info.user,
- password=database_info.password,
- database=database_info.database,
- port=database_info.port,
- charset=database_info.charset)
- cur = conn.cursor()
- # 测试返回
- sql = sqltext+' limit 0;'
- print(sql)
- cur.execute(sql)
- colnames = [desc[0] for desc in cur.description]
- cur.close()
- conn.close()
- except Exception as e:
- # 捕获其他所有类型的异常
- return f"未知错误: {e}"
- else:
- return '暂不支持除pymysql以外的驱动'
- # create_command = CommandEntity(id=id,sqltext=sqltext,datasource=datasource,scope=serviceid)
- # create_api = ApiServiceEntity(id=serviceid,name=serviname,status=1)
- # db.add(create_api)
- # db.add(create_command)
- service_info.id=serviceid
- service_info.name=serviname
- service_info.status=status
- command_info.id=id
- command_info.sqltext=sqltext
- command_info.datasource=datasource
- command_info.scope=serviceid
- db.commit()
- return [serviceid,colnames]
- @router.post('/v2/create_datasource')
- async def create_create_datasource_v2(
- request: Request,
- db: Session = Depends(get_db)
- ):
- # print(1)
- data = await request.body()
- body = data.decode(encoding='utf-8')
- if len(body) > 0:
- # print(body)
- body = json.loads(body)
- # 检查入参
- for param in ['datasourcename','dbtype','host','user','password','database','port','charset']:
- if param not in body or body[param]=='':
- return f'{param}为必填参数,不能不存在'
- datasource = str(uuid.uuid1())
- datasourcename = body['datasourcename']
- dbtype = body['dbtype']
- if body['dbtype'] == 'pymysql':
- host = body['host']
- user = body['user']
- password = body['password']
- database = body['database']
- port = body['port']
- charset = body['charset']
- # 数据库连通性校验
- try:
- conn = pymysql.connect(host=host,
- user=user,
- password=password,
- database=database,
- port=port,
- charset=charset)
- cur = conn.cursor()
- # 测试返回
- sql = ' select now()'
- print(sql)
- cur.execute(sql)
- testresult = cur.fetchall()
- cur.close()
- conn.close()
- except Exception as e:
- # 捕获其他所有类型的异常
- return f"未知错误: {e}"
- create_datasource = DatasourceEntity(id=datasource, name=datasourcename, scope='', dbtype=dbtype, host=host,
- user=user, password=password, database=database, port=port,
- charset=charset)
- db.add(create_datasource)
- else:
- return f'暂不支持{dbtype}'
- db.commit()
- return [datasource,testresult]
- @router.post('/v2/update_datasource')
- async def create_update_datasource_v2(
- request: Request,
- db: Session = Depends(get_db)
- ):
- # print(1)
- data = await request.body()
- body = data.decode(encoding='utf-8')
- if len(body) > 0:
- # print(body)
- body = json.loads(body)
- # 检查入参
- for param in ['id','datasourcename','dbtype','host','user','password','database','port','charset']:
- if param not in body or body[param]=='':
- return f'{param}为必填参数,不能不存在'
- # 根据数据库id检测是否存在
- datasource = body['id']
- database_info = db.query(DatasourceEntity).filter(DatasourceEntity.id == datasource).first()
- if database_info is None:
- return JSONResponse(status_code=410, content={
- 'errcode': 410,
- 'errmsg': f'数据库{datasource}不存在'
- })
- datasourcename = body['datasourcename']
- dbtype = body['dbtype']
- if body['dbtype'] == 'pymysql':
- host = body['host']
- user = body['user']
- password = body['password']
- database = body['database']
- port = body['port']
- charset = body['charset']
- # 数据库连通性校验
- try:
- conn = pymysql.connect(host=host,
- user=user,
- password=password,
- database=database,
- port=port,
- charset=charset)
- cur = conn.cursor()
- # 测试返回
- sql = ' select now()'
- print(sql)
- cur.execute(sql)
- testresult = cur.fetchall()
- cur.close()
- conn.close()
- except Exception as e:
- # 捕获其他所有类型的异常
- return f"未知错误: {e}"
- # create_datasource = DatasourceEntity(id=datasource, name=datasourcename, scope='', dbtype=dbtype, host=host,
- # user=user, password=password, database=database, port=port,
- # charset=charset)
- # db.add(create_datasource)
- database_info.name = datasourcename
- database_info.dbtype = dbtype
- database_info.host = host
- database_info.user = user
- database_info.password = password
- database_info.database = database
- database_info.port = port
- database_info.charset = charset
- else:
- return f'暂不支持{dbtype}'
- db.commit()
- return [datasource,testresult]
- @router.get('/v2/{serviceid}')
- @router.post('/v2/{serviceid}')
- # @router.post('/v2/{servicecode}')
- async def v2(serviceid:str,request: Request, db: Session = Depends(get_db)):
- # 获取请求头 servicecode
- # if service_code is None:
- # service_code = request.headers.get('servicecode')
- # print(serviceid)
- if serviceid is None:
- return JSONResponse(status_code=410, content= {
- 'code': 410,
- 'msg': "请求头servicecode未传参"
- })
- # 判断请求接口是否存在
- service_info = db.query(ApiServiceEntity).filter(ApiServiceEntity.id == serviceid).first()
- if service_info is None:
- return JSONResponse(status_code=410, content={
- 'code': 410,
- 'msg': 'servicecode服务不存在'
- })
- # print(service_code)
- # 判断请求接口对应数据库是否存在
- command_info = db.query(CommandEntity).filter(CommandEntity.scope == serviceid).first()
- if command_info is None:
- return JSONResponse(status_code=410, content={
- 'code': 410,
- 'msg': '查询命令没配置'
- })
- # 判断请求接口对应数据库是否存在
- database_info = db.query(DatasourceEntity).filter(DatasourceEntity.id == command_info.datasource).first()
- if database_info is None:
- return JSONResponse(status_code=410, content={
- 'errcode': 410,
- 'errmsg': '数据库没配置'
- })
- # 获取接口对应的sql
- sql = command_info.sqltext
- print('sql ==== << ', sql)
- # 从params和body获取参数
- # query_list = parse.parse_qsl(str(request.query_params))
- # print(query_list)
- data = await request.body()
- body = data.decode(encoding='utf-8')
- size = 10
- current = 1
- if len(body) > 0:
- body = json.loads(body)
- # 分页器 页数和页码的设置
- if "size" in body:
- if isinstance(body['size'], str):
- size = int(body['size'])
- elif isinstance(body['size'], int):
- size = body['size']
- if size >100:
- size = 100
- if "current" in body:
- if isinstance(body['current'], str):
- current = int(body['current'])
- elif isinstance(body['current'], int):
- current = body['current']
- if current<=0:
- current=1
- # 接口sql的参数替换
- if 'query' in body:
- for (key, val) in body['query'].items():
- if isinstance(val, int):
- val = str(val)
- if isinstance(val, float):
- val = str(val)
- if contains_special_characters(val):
- return JSONResponse(status_code=411, content={
- 'code': 411,
- 'msg': f'参数{key}含特殊符号:;、&、$、#、\'、\\t、@、空格等'
- })
- sql = sql.replace("{" + key + "}", val)
- print('sql ==== >> ', sql)
- data = []
- # 数据库类型为mysql情况下
- if database_info.dbtype == 'pymysql':
- # 数据库连接
- conn = pymysql.connect(host=database_info.host,
- user=database_info.user,
- password=database_info.password,
- database=database_info.database,
- port=database_info.port,
- charset=database_info.charset)
- cur = conn.cursor()
- # 查总数据量,分页数据处理
- totalsql = f'select count(*) from ({sql})t'
- print(totalsql)
- cur.execute(totalsql)
- total = cur.fetchone()[0]
- pages,pagesmod = divmod(total, size)
- print(total,pages,pagesmod)
- if pagesmod!=0:
- pages+=1
- print(pages,pagesmod)
- if total <size :
- size = total
- # 正式查询
- sql = sql+f' limit {size*(current-1)}, {size};'
- print(sql,size)
- cur.execute(sql)
- rows = cur.fetchall()
- colnames = [desc[0] for desc in cur.description]
- for row in range(len(rows)):
- item = {}
- for col in range(len(colnames)):
- field_name = colnames[col]
- item[field_name] = rows[row][col]
- data.append(item)
- # 数据库关闭
- cur.close()
- conn.close()
- else:
- return JSONResponse(status_code=410, content={
- 'code': 410,
- 'msg': '接口对应数据库暂不支持'
- })
- return {
- 'code': 0,
- 'msg': 'success',
- 'rows': data,
- 'pages': pages, # 总页数
- 'currentPage': current, # 当前页数
- # 'current':current,
- # 'total' : total,
- 'total': total, # 总数据量
- # 'size':size,
- 'pageSize': size # 页码
- }
- # else:
- # return JSONResponse(status_code=410, content={
- # 'code': 410,
- # 'msg': 'body不能为空'
- # })
|