|
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from fastapi import APIRouter, Request, Depends,Response,HTTPException,status,Query
- from fastapi.responses import StreamingResponse
- from fastapi.responses import JSONResponse
- from cachetools.keys import hashkey
- from sqlalchemy.orm import Session
- from typing import List, Optional
- from cachetools import LRUCache
- from pydantic import BaseModel
- from database import get_db
- from urllib import parse
- from models import *
- from utils import *
- import requests
- import hashlib
- import random
- import string
- import json
- import time
- import io
- router = APIRouter()
- def GetSign(signTime,nonce,passtoken):
- data = signTime+passtoken+nonce+signTime
- return hashlib.sha256(data.encode('utf-8')).hexdigest()
- def GetNonce(length):
- characters = string.ascii_letters + string.digits
- # 随机选择字符集的长度个字符
- random_string = ''.join(random.choice(characters) for _ in range(length))
- return random_string
- def GetTime():
- return int(time.time()*1000)
- # 设置缓存大小,例如 100 个缓存项
- cache_size = 10000
- cache = LRUCache(maxsize=cache_size)
- # 缓存过期时间,例如 10 分钟
- cache.ttl = 86400 # ttl(Time to Live) 以秒为单位
- @router.post('/proxyHandler/{city_code:path}/{service_code:path}')
- @router.get('/proxyHandler/{city_code:path}/{service_code:path}')
- async def mine(request: Request,body = Depends(remove_xss_json),db: Session = Depends(get_db)):
- target = str(request.url) # 获取接口地址
- cache_key = hashkey(target)
- cached_response = cache.get(cache_key)
- if cached_response:
- if 'image/png' in cached_response.headers.get('Content-Type', ''):
- return StreamingResponse(content=io.BytesIO(cached_response.content), media_type='image/png')
- return cached_response
- # 获取接口id 判断接口是否存在
- service_code = request.path_params.get('service_code')
- service_info = db.query(OneShareApiEntity).filter(OneShareApiEntity.servercode==service_code).first()
- if service_info is None:
- return JSONResponse(status_code=410, content={
- 'code': 410,
- 'msg': f'server_code{service_code}服务不存在'
- })
- city_code = request.path_params.get('city_code')
- if city_code == "mm":
- url = 'https://19.155.242.125/GatewayMsg/http/api/proxy/invoke'
- elif city_code == "gd":
- url = 'https://19.15.75.180:8581/GatewayMsg/http/api/proxy/invoke'
- else:
- return JSONResponse(status_code=410, content={
- 'code': 410,
- 'msg': f'city_code{city_code}不存在'
- })
- # 获取请求方式
- method = request.method
- # 获取请求体
- # body = await request.body()
- # body = body.decode(encoding='utf-8')
- # if len(body) > 0:
- # body = json.loads(body)
- # 获取默认params 1
- params_default = service_info.params_default
- if len(params_default)>0:
- print(params_default)
- params_default = json.loads(params_default)
- # 获取params
- query_list = parse.parse_qsl(str(request.query_params))
- params = {}
- for (key, val) in query_list:
- params[key]=val
- params_default.update(params)
- params = params_default
- # 生成请求头主体
- signTime = str(GetTime()//1000)
- nonce = GetNonce(5)
- sign = GetSign(signTime,nonce,service_info.passtoken)
- # 初始请求头
- headers = {
- # 'Content-Type': 'application/json',
- 'x-tif-signature': sign,
- 'x-tif-timestamp': signTime,
- 'x-tif-nonce': nonce,
- 'x-tif-paasid': service_info.passid,
- 'x-tif-serviceId': service_code
- }
- # 加入默认请求头
- headers_default = service_info.headers_default
- if len(headers_default)>0:
- headers_default = json.loads(headers_default)
- headers.update(headers_default)
- # 判断接口类型
- # 1 普通接口 请求头请求体用默认,前端传输的请求体嵌入到请求体query中
- # 2 地图接口
- # 3 自定义接口
- if service_info.servertype == 1:
- query_timestamp = str(GetTime())
- data = {
- "system_id": service_info.passid,
- "vender_id": 'xx',
- "department_id": 'xx',
- "query_timestamp": query_timestamp,
- "UID": GetNonce(5),
- "query": body,
- "audit_info": {
- "operator_id": 'xx',
- "operator_name": 'xx',
- "query_object_id": 'xx',
- "query_object_id_type": 'xx',
- "item_id": 'xx',
- "item_code": 'xx',
- "item_sequence": 'xx',
- "terminal_info": 'xx',
- "query_timestamp": query_timestamp
- }
- }
- body=data
- else:
- body_default = service_info.body_default
- if len(body_default)>0:
- body_default = json.loads(body_default)
- body_default.update(body)
- # 根据请求方式请求获取数据
- if method == "GET":
- response = requests.get(url=url,params=params,headers=headers,json=body,verify=False)
- elif method == "POST":
- response = requests.post(url=url,params=params,headers=headers,json=body,verify=False)
- else:
- return JSONResponse(status_code=410, content={
- 'code': 410,
- 'msg': f'请求方式{method}不支持'
- })
- # 根据响应头数据类型返回对应数据类型
- content_type = response.headers.get('Content-Type', '')
- if 'application/json' in content_type:
- return JSONResponse(content=response.json(), media_type='application/json',status_code=response.status_code)
- elif 'application/xml' in content_type or 'text/xml' in content_type:
- return Response(content=response.text, media_type='application/xml',status_code=response.status_code)
- elif 'text/html' in content_type:
- return Response(content=response.text, media_type='application/html',status_code=response.status_code)
- elif 'image/png' in content_type:
- if response.status_code==200:
- cache[cache_key] = response
- return StreamingResponse(content=io.BytesIO(response.content), media_type='image/png')
- # 可以继续添加更多的条件分支来处理其他类型
- else:
- return Response(content=response.text, media_type=content_type)
- class OneShareApiCreateForm(BaseModel):
- passid: str
- passtoken: str
- servercode: str
- servertype: int
- params_default: str = ""
- body_default: str = ""
- headers_default: str = ""
- servername: str
- @router.post("/create")
- async def create_one_share_api(
- api_data: OneShareApiCreateForm,
- db: Session = Depends(get_db)
- ):
- try:
- # 创建一个新的 OneShareApiEntity 实例
- new_api = OneShareApiEntity(
- passid=api_data.passid,
- passtoken=api_data.passtoken,
- servercode=api_data.servercode,
- servertype=api_data.servertype,
- params_default=api_data.params_default,
- body_default=api_data.body_default,
- headers_default=api_data.headers_default,
- servername=api_data.servername
- )
- # 添加到数据库会话并提交
- db.add(new_api)
- db.commit()
- db.refresh(new_api) # 刷新实例以包含新的 ID 等信息
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- except Exception as e:
- # 处理异常
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
- class OneShareApiUpdateForm(BaseModel):
- passid: str = None
- passtoken: str = None
- servercode: str = None
- servertype: int = None
- params_default: str = None
- body_default: str = None
- headers_default: str = None
- servername: str = None
- @router.put("/update/{id}") # 或者使用 @app.patch 如果你只想更新部分字段
- async def update_one_share_api(
- id: int,
- api_data: OneShareApiUpdateForm,
- db: Session = Depends(get_db)
- ):
- try:
- # 从数据库中获取现有的 OneShareApiEntity 实例
- api = db.query(OneShareApiEntity).filter(OneShareApiEntity.id == id).first()
- if not api:
- raise HTTPException(status_code=404, detail="接口不存在")
- # 更新字段
- if api_data.passid:
- api.passid = api_data.passid
- if api_data.passtoken:
- api.passtoken = api_data.passtoken
- if api_data.servercode:
- api.servercode = api_data.servercode
- if api_data.servertype:
- api.servertype = api_data.servertype
- if api_data.params_default:
- api.params_default = api_data.params_default
- if api_data.body_default:
- api.body_default = api_data.body_default
- if api_data.headers_default:
- api.headers_default = api_data.headers_default
- if api_data.servername:
- api.servername = api_data.servername
- # 更新时间
- api.update_time = datetime.now()
- # 提交更改
- db.commit()
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- except Exception as e:
- # 处理异常
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
- @router.delete("/delete/{id}") # 使用 ID 来标识要删除的接口
- async def delete_one_share_api(
- id: int,
- db: Session = Depends(get_db)
- ):
- # try:
- # 从数据库中获取要删除的 OneShareApiEntity 实例
- api = db.query(OneShareApiEntity).filter(OneShareApiEntity.id == id).first()
- if api is None:
- raise HTTPException(status_code=404, detail="接口不存在")
- # 删除实例
- db.delete(api)
- db.commit()
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- # except Exception as e:
- # # 处理异常
- # print(e)
- # raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
- class OneShareApiOut(BaseModel):
- id: int
- passid: str
- passtoken: str
- servercode: str
- servertype: int
- params_default: str
- body_default: str
- headers_default: str
- servername: str
- create_time: str
- update_time: str
- # 定义查询接口的路由
- @router.get("/list")
- async def get_one_share_apis(
- page: int = Query(1, gt=0),
- page_size: int = Query(10, gt=0),
- servername: Optional[str] = Query(None),
- servertype:int = Query(None),
- servercode: Optional[str] = Query(None),
- db: Session = Depends(get_db)
- ):
- try:
- # 构建查询
- query = db.query(OneShareApiEntity)
- # 应用查询参数
- if servername:
- query = query.filter(OneShareApiEntity.servername.contains(servername))
- if servercode:
- query = query.filter(OneShareApiEntity.servercode.contains(servercode))
- if servertype:
- query = query.filter(OneShareApiEntity.servertype==servertype)
- # 获取总记录数
- total_count = query.count()
- # 执行分页查询
- items = query.offset((page - 1) * page_size).limit(page_size).all()
- # 将查询结果转换为 Pydantic 模型列表
- apis_out = [
- OneShareApiOut(
- id=item.id,
- passid=item.passid,
- passtoken=item.passtoken,
- servercode=item.servercode,
- servertype=item.servertype,
- params_default=item.params_default,
- body_default=item.body_default,
- headers_default=item.headers_default,
- servername=item.servername,
- create_time=item.create_time.strftime('%Y-%m-%d %H:%M:%S'),
- update_time=item.update_time.strftime('%Y-%m-%d %H:%M:%S')
- ) for item in items
- ]
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "查询成功",
- "data": {
- "total": total_count,
- "list": apis_out
- }
- }
- except Exception as e:
- # 处理异常
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
|