#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends,Response,HTTPException,status,Query from fastapi.responses import StreamingResponse from fastapi.responses import JSONResponse from cachetools.keys import hashkey from sqlalchemy.orm import Session from typing import List, Optional from cachetools import LRUCache from pydantic import BaseModel from database import get_db from urllib import parse from models import * from utils import * import requests import hashlib import random import string import json import time import io router = APIRouter() def GetSign(signTime,nonce,passtoken): data = signTime+passtoken+nonce+signTime return hashlib.sha256(data.encode('utf-8')).hexdigest() def GetNonce(length): characters = string.ascii_letters + string.digits # 随机选择字符集的长度个字符 random_string = ''.join(random.choice(characters) for _ in range(length)) return random_string def GetTime(): return int(time.time()*1000) # 设置缓存大小,例如 100 个缓存项 cache_size = 10000 cache = LRUCache(maxsize=cache_size) # 缓存过期时间,例如 10 分钟 cache.ttl = 86400 # ttl(Time to Live) 以秒为单位 @router.post('/proxyHandler/{city_code:path}/{service_code:path}') @router.get('/proxyHandler/{city_code:path}/{service_code:path}') async def mine(request: Request,body = Depends(remove_xss_json),db: Session = Depends(get_db)): target = str(request.url) # 获取接口地址 cache_key = hashkey(target) cached_response = cache.get(cache_key) if cached_response: if 'image/png' in cached_response.headers.get('Content-Type', ''): return StreamingResponse(content=io.BytesIO(cached_response.content), media_type='image/png') return cached_response # 获取接口id 判断接口是否存在 service_code = request.path_params.get('service_code') service_info = db.query(OneShareApiEntity).filter(OneShareApiEntity.servercode==service_code).first() if service_info is None: return JSONResponse(status_code=410, content={ 'code': 410, 'msg': f'server_code{service_code}服务不存在' }) city_code = request.path_params.get('city_code') if city_code == "mm": url = 'https://19.155.242.125/GatewayMsg/http/api/proxy/invoke' elif city_code == "gd": url = 'https://19.15.75.180:8581/GatewayMsg/http/api/proxy/invoke' else: return JSONResponse(status_code=410, content={ 'code': 410, 'msg': f'city_code{city_code}不存在' }) # 获取请求方式 method = request.method # 获取请求体 # body = await request.body() # body = body.decode(encoding='utf-8') # if len(body) > 0: # body = json.loads(body) # 获取默认params 1 params_default = service_info.params_default if len(params_default)>0: print(params_default) params_default = json.loads(params_default) # 获取params query_list = parse.parse_qsl(str(request.query_params)) params = {} for (key, val) in query_list: params[key]=val params_default.update(params) params = params_default # 生成请求头主体 signTime = str(GetTime()//1000) nonce = GetNonce(5) sign = GetSign(signTime,nonce,service_info.passtoken) # 初始请求头 headers = { # 'Content-Type': 'application/json', 'x-tif-signature': sign, 'x-tif-timestamp': signTime, 'x-tif-nonce': nonce, 'x-tif-paasid': service_info.passid, 'x-tif-serviceId': service_code } # 加入默认请求头 headers_default = service_info.headers_default if len(headers_default)>0: headers_default = json.loads(headers_default) headers.update(headers_default) # 判断接口类型 # 1 普通接口 请求头请求体用默认,前端传输的请求体嵌入到请求体query中 # 2 地图接口 # 3 自定义接口 if service_info.servertype == 1: query_timestamp = str(GetTime()) data = { "system_id": service_info.passid, "vender_id": 'xx', "department_id": 'xx', "query_timestamp": query_timestamp, "UID": GetNonce(5), "query": body, "audit_info": { "operator_id": 'xx', "operator_name": 'xx', "query_object_id": 'xx', "query_object_id_type": 'xx', "item_id": 'xx', "item_code": 'xx', "item_sequence": 'xx', "terminal_info": 'xx', "query_timestamp": query_timestamp } } body=data else: body_default = service_info.body_default if len(body_default)>0: body_default = json.loads(body_default) body_default.update(body) # 根据请求方式请求获取数据 if method == "GET": response = requests.get(url=url,params=params,headers=headers,json=body,verify=False) elif method == "POST": response = requests.post(url=url,params=params,headers=headers,json=body,verify=False) else: return JSONResponse(status_code=410, content={ 'code': 410, 'msg': f'请求方式{method}不支持' }) # 根据响应头数据类型返回对应数据类型 content_type = response.headers.get('Content-Type', '') if 'application/json' in content_type: return JSONResponse(content=response.json(), media_type='application/json',status_code=response.status_code) elif 'application/xml' in content_type or 'text/xml' in content_type: return Response(content=response.text, media_type='application/xml',status_code=response.status_code) elif 'text/html' in content_type: return Response(content=response.text, media_type='application/html',status_code=response.status_code) elif 'image/png' in content_type: if response.status_code==200: cache[cache_key] = response return StreamingResponse(content=io.BytesIO(response.content), media_type='image/png') # 可以继续添加更多的条件分支来处理其他类型 else: return Response(content=response.text, media_type=content_type) class OneShareApiCreateForm(BaseModel): passid: str passtoken: str servercode: str servertype: int params_default: str = "" body_default: str = "" headers_default: str = "" servername: str @router.post("/create") async def create_one_share_api( api_data: OneShareApiCreateForm, db: Session = Depends(get_db) ): try: # 创建一个新的 OneShareApiEntity 实例 new_api = OneShareApiEntity( passid=api_data.passid, passtoken=api_data.passtoken, servercode=api_data.servercode, servertype=api_data.servertype, params_default=api_data.params_default, body_default=api_data.body_default, headers_default=api_data.headers_default, servername=api_data.servername ) # 添加到数据库会话并提交 db.add(new_api) db.commit() db.refresh(new_api) # 刷新实例以包含新的 ID 等信息 # 构建并返回响应 return { "code": 200, "msg": "操作成功", "data": None } except Exception as e: # 处理异常 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) class OneShareApiUpdateForm(BaseModel): passid: str = None passtoken: str = None servercode: str = None servertype: int = None params_default: str = None body_default: str = None headers_default: str = None servername: str = None @router.put("/update/{id}") # 或者使用 @app.patch 如果你只想更新部分字段 async def update_one_share_api( id: int, api_data: OneShareApiUpdateForm, db: Session = Depends(get_db) ): try: # 从数据库中获取现有的 OneShareApiEntity 实例 api = db.query(OneShareApiEntity).filter(OneShareApiEntity.id == id).first() if not api: raise HTTPException(status_code=404, detail="接口不存在") # 更新字段 if api_data.passid: api.passid = api_data.passid if api_data.passtoken: api.passtoken = api_data.passtoken if api_data.servercode: api.servercode = api_data.servercode if api_data.servertype: api.servertype = api_data.servertype if api_data.params_default: api.params_default = api_data.params_default if api_data.body_default: api.body_default = api_data.body_default if api_data.headers_default: api.headers_default = api_data.headers_default if api_data.servername: api.servername = api_data.servername # 更新时间 api.update_time = datetime.now() # 提交更改 db.commit() # 构建并返回响应 return { "code": 200, "msg": "操作成功", "data": None } except Exception as e: # 处理异常 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) @router.delete("/delete/{id}") # 使用 ID 来标识要删除的接口 async def delete_one_share_api( id: int, db: Session = Depends(get_db) ): # try: # 从数据库中获取要删除的 OneShareApiEntity 实例 api = db.query(OneShareApiEntity).filter(OneShareApiEntity.id == id).first() if api is None: raise HTTPException(status_code=404, detail="接口不存在") # 删除实例 db.delete(api) db.commit() # 构建并返回响应 return { "code": 200, "msg": "操作成功", "data": None } # except Exception as e: # # 处理异常 # print(e) # raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) class OneShareApiOut(BaseModel): id: int passid: str passtoken: str servercode: str servertype: int params_default: str body_default: str headers_default: str servername: str create_time: str update_time: str # 定义查询接口的路由 @router.get("/list") async def get_one_share_apis( page: int = Query(1, gt=0), page_size: int = Query(10, gt=0), servername: Optional[str] = Query(None), servertype:int = Query(None), servercode: Optional[str] = Query(None), db: Session = Depends(get_db) ): try: # 构建查询 query = db.query(OneShareApiEntity) # 应用查询参数 if servername: query = query.filter(OneShareApiEntity.servername.contains(servername)) if servercode: query = query.filter(OneShareApiEntity.servercode.contains(servercode)) if servertype: query = query.filter(OneShareApiEntity.servertype==servertype) # 获取总记录数 total_count = query.count() # 执行分页查询 items = query.offset((page - 1) * page_size).limit(page_size).all() # 将查询结果转换为 Pydantic 模型列表 apis_out = [ OneShareApiOut( id=item.id, passid=item.passid, passtoken=item.passtoken, servercode=item.servercode, servertype=item.servertype, params_default=item.params_default, body_default=item.body_default, headers_default=item.headers_default, servername=item.servername, create_time=item.create_time.strftime('%Y-%m-%d %H:%M:%S'), update_time=item.update_time.strftime('%Y-%m-%d %H:%M:%S') ) for item in items ] # 构建并返回响应 return { "code": 200, "msg": "查询成功", "data": { "total": total_count, "list": apis_out } } except Exception as e: # 处理异常 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))