#!/usr/bin/env python3 # -*- coding: utf-8 -*- from re import S from fastapi import APIRouter, Request, Depends, Form, Body, File, UploadFile, BackgroundTasks from fastapi.responses import Response from fastapi.responses import JSONResponse from database import get_db from sqlalchemy.orm import Session from utils import * from models import * from urllib import parse from common.DBgetdata import dbgetdata from pprint import pprint from datetime import datetime # import pandas as pd from pydantic import BaseModel import sqlalchemy import pymysql import json,time,io import time, os import math import uuid import re from . import sign_api router = APIRouter() router.include_router(sign_api.router) def contains_special_characters(input_string, special_characters=";|&|$|#|'|@| "): """ 判断字符串是否包含特殊符号。 :param input_string: 需要检查的字符串 :param special_characters: 特殊符号的字符串,多个符号用竖线 '|' 分隔 :return: 如果包含特殊符号返回 True,否则返回 False """ # 创建正则表达式模式 pattern = re.compile('[' + re.escape(special_characters) + ']') # 搜索字符串中的特殊符号 if pattern.search(input_string): return True return False @router.get('/v1/demo') @router.post('/v1/demo') async def test1(request: Request): data = await request.body() body = data.decode(encoding='utf-8') # print(body) if len(body) > 0: body = json.loads(body) print(body) print([body,{'msg':'good'}]) return body else: return body @router.post('/create_api_service') async def create_api_service( request: Request, db: Session = Depends(get_db) ): # print(1) data = await request.body() body = data.decode(encoding='utf-8') if len(body) > 0: # print(body) body = json.loads(body) serviceid = str(uuid.uuid4()).replace("-","") serviname = body['servicename'] datasource = str(uuid.uuid1()) sqlname = body['sqlname'] id = str(uuid.uuid4()) dbtype = body['dbtype'] if body['dbtype'] == 'pymysql': host = body['host'] user = body['user'] password = body['password'] database = body['database'] port = body['port'] charset = body['charset'] create_datasource = DatasourceEntity(id= datasource,name=sqlname,scope=serviceid,dbtype=dbtype,host=host,user=user,password=password,database=database,port=port,charset=charset) # db.add() # print() db.add(create_datasource) sqltext = body['sqltext'] create_command = CommandEntity(id=id,sqltext=sqltext,datasource=datasource,scope=serviceid) create_api = ApiServiceEntity(id=serviceid,name=serviname) db.add(create_api) db.add(create_command) db.commit() return serviceid @router.get('/v1/{service_code:path}') @router.post('/v1/{service_code:path}') async def v1(request: Request, background_tasks: BackgroundTasks, db: Session = Depends(get_db)): service_code = request.path_params.get('service_code') query_list = parse.parse_qsl(str(request.query_params)) print(query_list) data = await request.body() body = data.decode(encoding='utf-8') if len(body) > 0: print('[body]', body) body = json.loads(body) # print("1111",body) service_info = db.query(ApiServiceEntity).filter(ApiServiceEntity.id == service_code).first() if service_info is None: return { 'errcode': 1, 'errmsg': '服务不存在' } # print(service_info) print('service_name', service_info.name) # print('database_name', database_info.name, database_info.dsn) command_info = db.query(CommandEntity).filter(CommandEntity.scope == service_code).first() if command_info is None: return { 'errcode': 1, 'errmsg': '查询命令没配置' } database_info = db.query(DatasourceEntity).filter(DatasourceEntity.id == command_info.datasource).first() if database_info is None: return { 'errcode': 1, 'errmsg': '数据库没配置' } sql = command_info.sqltext print('sql ==== << ', sql) meta_data = {} # 从query获取参数 for (key, val) in query_list: if key != 'access_token' and key !='page' and key != 'limit' and key !='create_time' and key != 'record_name'and key != 'record_cid': sql = sql.replace("{" + key + "}", val.replace('全部','')) elif key =='create_time': if val != '全部': sql = sql.replace("{" + key + "}", ('DATE_FORMAT(create_time,\'%Y-%m-%d\') =\''+val+'\' and')) #DATE_FORMAT(create_time,'%Y-%m-%d') ='{create_time}' else: sql = sql.replace("{" + key + "}", '') elif key =='record_name' or key =='record_cid' : if val != '全部': sql = sql.replace("{" + key + "}", (key+'=\''+val+'\' and')) else: sql = sql.replace("{" + key + "}", '') # elif key =='record_cid': # if val != '全部': # sql = sql.replace("{" + key + "}", ('record_name =\''+val+'\' and')) # else: # print(111) # sql = sql.replace("{" + key + "}", '') elif key =='page' : meta_data['page'] = int(val) elif key == 'limit': meta_data['limit'] = int(val) # 从json body获取参数 if len(body) > 0: print(body) for (key, val) in body.items(): if key == 'page1' and 'limit1' in body: if val==-1: pass if val == '': val = '0' page1 = int(val) limit1=int(body['limit1']) print(str((page1-1)*limit1)) sql = sql.replace('{page1}',str((page1-1)*limit1)) # elif key == 'limit1' and 'page1' in body: # page1 = int(body['page1']) # limit1 = int(val) # sql.replace('{page1}', str((page1-1)*limit1)) elif key !='page' and key != 'limit' and key !='create_time': if isinstance(val, int): val = str(val) if isinstance(val, float): val = str(val) sql = sql.replace("{" + key + "}", val) elif key =='create_time': if val != '全部': sql = sql.replace("{" + key + "}", ('DATE_FORMAT(create_time,\'%Y-%m-%d\') =\''+val+'\' and')) #DATE_FORMAT(create_time,'%Y-%m-%d') ='{create_time}' else: sql = sql.replace("{" + key + "}", '') elif key =='page' : if isinstance(val, str): meta_data['page'] = int(val) elif isinstance(val, int): meta_data['page'] = val elif key == 'limit': if isinstance(val, str): meta_data['limit'] = int(val) elif isinstance(val, int): meta_data['limit'] = val elif key == 'taskNo' and val=='': print("他们又输入空的taskNo啦!") return [{"text":"兄弟,taskNo不能为空"}] print('sql ==== >> ', sql) data = [] if database_info.dbtype == 'psycopg2': ''' print(1111) conn = psycopg2.connect(database_info.dsn) cur = conn.cursor() cur.execute(sql) rows = cur.fetchall() # 字段名列表 colnames = [desc[0] for desc in cur.description] item = {} for row in rows: for col in range(len(colnames)): field_name = colnames[col] item[field_name] = row[col] data.append(item) conn.close() ''' elif database_info.dbtype == 'pymysql': # print(database_info) conn = pymysql.connect(host=database_info.host, user=database_info.user, password=database_info.password, database=database_info.database, port=database_info.port, charset=database_info.charset) cur = conn.cursor() cur.execute(sql) rows = cur.fetchall() # 字段名列表 colnames = [desc[0] for desc in cur.description] # print(colnames) # print(rows) pages = 1 #总页数 current = 1 #第几页 total = len(rows) size = len(rows) # print(size) if 'page' in meta_data and 'limit' in meta_data: current = meta_data['page'] size = meta_data['limit'] if (current == 0 or size == 0) and total != 0: current = 1 size = 5 pages = total//size if total%size !=0: pages+=1 start_index = (current-1)*size end_index = current*size if pages <= current : # current = pages if total == size : end_index = (current-1)*size+total elif total%size == 0: end_index = current*size else: end_index = (current-1)*size+total%size start_index = (current-1)*size if total ==0: start_index = end_index =0 # print(start_index,end_index) for row in range(start_index,end_index): item = {} for col in range(len(colnames)): field_name = colnames[col] item[field_name] = rows[row][col] data.append(item) # print(item) # for row in rows: # item = {} # for col in range(len(colnames)): # field_name = colnames[col] # item[field_name] = row[col] # data.append(item) # print(item) # print(data) conn.close() return { 'code': 0, 'errcode': 0, 'errmsg': '查询成功', 'data': {"list":data, 'pages':pages, # 总页数 'currentPage':current, #当前页数 # 'current':current, # 'total' : total, 'total' : total, # 总数据量 # 'size':size, 'pageSize':size #页码 } } # import time # print(time) # if 'ispages' in body: # try: # if body['ispages']=='1': # return { # 'page': [{"total": pages, # "page": current}], # "data":data # } # # data['pages'] = # except: # pass # # #background_tasks.add_task(post_service_method, service_code, body, db) # # return data class CreateApiServiceFrom(BaseModel): datasource:str name:str sqltext:str serviceid:str=None @router.post('/v2/create_api_service') async def create_api_service_v2( form_data: CreateApiServiceFrom, request: Request, db: Session = Depends(get_db) ): # print(1) data = await request.body() body = data.decode(encoding='utf-8') if len(body) > 0: # print(body) body = json.loads(body) for param in ['datasource','name','sqltext']: if param not in body or body[param]=='': return f'{param}为必填参数,不能不存在' # 数据库信息校验 datasource = form_data.datasource database_info = db.query(DatasourceEntity).filter(DatasourceEntity.id == datasource).first() if database_info is None: return JSONResponse(status_code=410, content={ 'errcode': 410, 'errmsg': f'数据库-{datasource}-不存在' }) # 接口信息生成及获取 if form_data.serviceid is None: serviceid = str(uuid.uuid4()).replace("-","") else: serviceid = form_data.serviceid serviname = form_data.name id = str(uuid.uuid4()) sqltext = form_data.sqltext # 接口执行校验 if database_info.dbtype == 'pymysql': try: conn = pymysql.connect(host=database_info.host, user=database_info.user, password=database_info.password, database=database_info.database, port=database_info.port, charset=database_info.charset) cur = conn.cursor() # 测试返回 sql = sqltext+' limit 0;' print(sql) cur.execute(sql) colnames = [desc[0] for desc in cur.description] cur.close() conn.close() except Exception as e: # 捕获其他所有类型的异常 return f"未知错误: {e}" else: return '暂不支持除pymysql以外的驱动' create_command = CommandEntity(id=id,sqltext=sqltext,datasource=datasource,scope=serviceid) create_api = ApiServiceEntity(id=serviceid,name=serviname,status=1) db.add(create_api) db.add(create_command) db.commit() return [serviceid,colnames] @router.post('/v2/update_api_service') async def update_api_service_v2( request: Request, db: Session = Depends(get_db) ): # print(1) data = await request.body() body = data.decode(encoding='utf-8') if len(body) > 0: # print(body) body = json.loads(body) for param in ['id','datasource','scope','name','sqltext','status']: if param not in body or body[param]=='': return f'{param}为必填参数,不能不存在' # 数据库信息校验 datasource = body['datasource'] database_info = db.query(DatasourceEntity).filter(DatasourceEntity.id == datasource).first() if database_info is None: return JSONResponse(status_code=410, content={ 'errcode': 410, 'errmsg': f'数据库-{datasource}-不存在' }) serviceid = body['scope'] # 判断请求接口是否存在 service_info = db.query(ApiServiceEntity).filter(ApiServiceEntity.id == serviceid).first() if service_info is None: return JSONResponse(status_code=410, content={ 'code': 410, 'msg': f'servicecode{serviceid}服务不存在' }) command_info = db.query(CommandEntity).filter(CommandEntity.scope == serviceid).first() if command_info is None: return JSONResponse(status_code=410, content={ 'code': 410, 'msg': f'servicecode{serviceid}服务不存在' }) # 接口信息生成及获取 id = body['id'] serviname = body['name'] sqltext = body['sqltext'] status = body['status'] # 接口执行校验 if database_info.dbtype == 'pymysql': try: conn = pymysql.connect(host=database_info.host, user=database_info.user, password=database_info.password, database=database_info.database, port=database_info.port, charset=database_info.charset) cur = conn.cursor() # 测试返回 sql = sqltext+' limit 0;' print(sql) cur.execute(sql) colnames = [desc[0] for desc in cur.description] cur.close() conn.close() except Exception as e: # 捕获其他所有类型的异常 return f"未知错误: {e}" else: return '暂不支持除pymysql以外的驱动' # create_command = CommandEntity(id=id,sqltext=sqltext,datasource=datasource,scope=serviceid) # create_api = ApiServiceEntity(id=serviceid,name=serviname,status=1) # db.add(create_api) # db.add(create_command) service_info.id=serviceid service_info.name=serviname service_info.status=status command_info.id=id command_info.sqltext=sqltext command_info.datasource=datasource command_info.scope=serviceid db.commit() return [serviceid,colnames] @router.post('/v2/create_datasource') async def create_create_datasource_v2( request: Request, db: Session = Depends(get_db) ): # print(1) data = await request.body() body = data.decode(encoding='utf-8') if len(body) > 0: # print(body) body = json.loads(body) # 检查入参 for param in ['datasourcename','dbtype','host','user','password','database','port','charset']: if param not in body or body[param]=='': return f'{param}为必填参数,不能不存在' datasource = str(uuid.uuid1()) datasourcename = body['datasourcename'] dbtype = body['dbtype'] if body['dbtype'] == 'pymysql': host = body['host'] user = body['user'] password = body['password'] database = body['database'] port = body['port'] charset = body['charset'] # 数据库连通性校验 try: conn = pymysql.connect(host=host, user=user, password=password, database=database, port=port, charset=charset) cur = conn.cursor() # 测试返回 sql = ' select now()' print(sql) cur.execute(sql) testresult = cur.fetchall() cur.close() conn.close() except Exception as e: # 捕获其他所有类型的异常 return f"未知错误: {e}" create_datasource = DatasourceEntity(id=datasource, name=datasourcename, scope='', dbtype=dbtype, host=host, user=user, password=password, database=database, port=port, charset=charset) db.add(create_datasource) else: return f'暂不支持{dbtype}' db.commit() return [datasource,testresult] @router.post('/v2/update_datasource') async def create_update_datasource_v2( request: Request, db: Session = Depends(get_db) ): # print(1) data = await request.body() body = data.decode(encoding='utf-8') if len(body) > 0: # print(body) body = json.loads(body) # 检查入参 for param in ['id','datasourcename','dbtype','host','user','password','database','port','charset']: if param not in body or body[param]=='': return f'{param}为必填参数,不能不存在' # 根据数据库id检测是否存在 datasource = body['id'] database_info = db.query(DatasourceEntity).filter(DatasourceEntity.id == datasource).first() if database_info is None: return JSONResponse(status_code=410, content={ 'errcode': 410, 'errmsg': f'数据库{datasource}不存在' }) datasourcename = body['datasourcename'] dbtype = body['dbtype'] if body['dbtype'] == 'pymysql': host = body['host'] user = body['user'] password = body['password'] database = body['database'] port = body['port'] charset = body['charset'] # 数据库连通性校验 try: conn = pymysql.connect(host=host, user=user, password=password, database=database, port=port, charset=charset) cur = conn.cursor() # 测试返回 sql = ' select now()' print(sql) cur.execute(sql) testresult = cur.fetchall() cur.close() conn.close() except Exception as e: # 捕获其他所有类型的异常 return f"未知错误: {e}" # create_datasource = DatasourceEntity(id=datasource, name=datasourcename, scope='', dbtype=dbtype, host=host, # user=user, password=password, database=database, port=port, # charset=charset) # db.add(create_datasource) database_info.name = datasourcename database_info.dbtype = dbtype database_info.host = host database_info.user = user database_info.password = password database_info.database = database database_info.port = port database_info.charset = charset else: return f'暂不支持{dbtype}' db.commit() return [datasource,testresult] @router.get('/v2/{serviceid}') @router.post('/v2/{serviceid}') # @router.post('/v2/{servicecode}') async def v2(serviceid:str,request: Request, db: Session = Depends(get_db)): # 获取请求头 servicecode # if service_code is None: # service_code = request.headers.get('servicecode') # print(serviceid) if serviceid is None: return JSONResponse(status_code=410, content= { 'code': 410, 'msg': "请求头servicecode未传参" }) # 判断请求接口是否存在 service_info = db.query(ApiServiceEntity).filter(ApiServiceEntity.id == serviceid).first() if service_info is None: return JSONResponse(status_code=410, content={ 'code': 410, 'msg': 'servicecode服务不存在' }) # print(service_code) # 判断请求接口对应数据库是否存在 command_info = db.query(CommandEntity).filter(CommandEntity.scope == serviceid).first() if command_info is None: return JSONResponse(status_code=410, content={ 'code': 410, 'msg': '查询命令没配置' }) # 判断请求接口对应数据库是否存在 database_info = db.query(DatasourceEntity).filter(DatasourceEntity.id == command_info.datasource).first() if database_info is None: return JSONResponse(status_code=410, content={ 'errcode': 410, 'errmsg': '数据库没配置' }) # 获取接口对应的sql sql = command_info.sqltext print('sql ==== << ', sql) # 从params和body获取参数 # query_list = parse.parse_qsl(str(request.query_params)) # print(query_list) data = await request.body() body = data.decode(encoding='utf-8') size = 10 current = 1 if len(body) > 0: body = json.loads(body) # 分页器 页数和页码的设置 if "size" in body: if isinstance(body['size'], str): size = int(body['size']) elif isinstance(body['size'], int): size = body['size'] if size >100: size = 100 if "current" in body: if isinstance(body['current'], str): current = int(body['current']) elif isinstance(body['current'], int): current = body['current'] if current<=0: current=1 # 接口sql的参数替换 if 'query' in body: for (key, val) in body['query'].items(): if isinstance(val, int): val = str(val) if isinstance(val, float): val = str(val) if contains_special_characters(val): return JSONResponse(status_code=411, content={ 'code': 411, 'msg': f'参数{key}含特殊符号:;、&、$、#、\'、\\t、@、空格等' }) sql = sql.replace("{" + key + "}", val) print('sql ==== >> ', sql) data = [] # 数据库类型为mysql情况下 if database_info.dbtype == 'pymysql': # 数据库连接 conn = pymysql.connect(host=database_info.host, user=database_info.user, password=database_info.password, database=database_info.database, port=database_info.port, charset=database_info.charset) cur = conn.cursor() # 查总数据量,分页数据处理 totalsql = f'select count(*) from ({sql})t' print(totalsql) cur.execute(totalsql) total = cur.fetchone()[0] pages,pagesmod = divmod(total, size) print(total,pages,pagesmod) if pagesmod!=0: pages+=1 print(pages,pagesmod) if total