#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Body, File, UploadFile from fastapi.responses import FileResponse from starlette.responses import StreamingResponse from sqlalchemy.orm import Session from database import get_db import requests import json import os import base64 import uuid import time from config import settings from pprint import pprint from exceptions import * from utils import * from extensions import logger import os from models import * from utils import * from utils.sg_auth import * from utils.StripTagsHTMLParser import * from PIL import Image from io import BytesIO router = APIRouter() FILE_ALIAS_MAP = { 'yyzz': '隐患点图片', 'yjya': '预案附件', 'ylzj': '演练总结', } @router.post('/upload') async def upload( *, request: Request, uuid: str = Body(...), fileType: str = Body(...), file: UploadFile = File(...), ext_info: str = Depends(yst_pass_ext), db: Session = Depends(get_db) ): sfzh = ext_info['cid'] timestamp = int(time.time()) file_name = file.filename logger.debug("fileupload {}", file_name) # 文件后续名校验 suffix = os.path.splitext(file_name)[-1] if suffix.find('.') != -1: if suffix.lower() not in ['.jpg', '.jpeg', '.png', '.pdf']: return { 'fileName': '', 'basePath': '' } else: suffix = '.png' file_name = get_filename(suffix) file_dir = os.path.join('/data/upload/mergefile/uploads/', "{}/{}/{}".format("yst", uuid, fileType)) os.makedirs(file_dir, mode=0o764, exist_ok=True) file_path = os.path.abspath("{}/{}".format(file_dir, file_name)) if os.path.exists(file_path): os.remove(file_path) content = await file.read() with open(file_path, 'wb') as f: f.write(content) logger.debug("fileupload {}", file_path) file_desc = '' if fileType in FILE_ALIAS_MAP.keys(): file_desc = FILE_ALIAS_MAP[fileType] db_entity = YssYstUploadFileEntity(uuid=uuid, sfzh=sfzh, file_name=file_name, save_filepath=file_path, file_type=fileType, file_desc=file_desc, created_time=timestamp) db.add(db_entity) db.commit() path = "{}/{}/{}".format(uuid, fileType, file_name) resp = { 'fileName': file_name, 'basePath': path } return yst_image_response(resp) def get_filename(suffix): return str(uuid.uuid1()) + suffix.lower() @router.post('/delete') def delete( *, request: Request, param: dict = Depends(yst_request_param), db: Session = Depends(get_db) ): path = get_req_param(param, 'path') file_path = os.path.abspath(os.path.join('/data/upload/mergefile/uploads/', "yst", path)) if os.path.exists(file_path): print("delete file:", file_path) os.remove(file_path) db.query(YssYstUploadFileEntity).filter(YssYstUploadFileEntity.save_filepath == file_path).delete() db.commit() return yst_response({'ret':0}) @router.post('/show') def show( *, request: Request, param: dict = Depends(yst_request_param), db: Session = Depends(get_db) ): uuid = get_req_param(param, 'uuid') valueList = [] dir = os.path.join('/data/upload/mergefile/uploads/', "{}/{}".format("yst", uuid)) print('upload dir', dir) if os.path.exists(dir): for file_type in os.listdir(dir): file_type_path = os.path.join(dir, file_type) if os.path.exists(file_type_path) and os.path.isdir(file_type_path): file_list = [] for file_name in os.listdir(file_type_path): file_list.append("/mmh5_yjxm_yst/ebus/yst_mmzhyj/api/yst/file/show?thumb=&file_name={}".format(file_name)) valueMap = { 'fileType': file_type, 'imageUrls': file_list } valueList.append(valueMap) sfz_list = [] qtFile = [] for n in valueList: file_type = n['fileType'] if file_type in ['sfz_zm', 'sfz_bm']: sfz_list = sfz_list + n['imageUrls'] else: qtFile.append({'fileName': FILE_ALIAS_MAP[file_type], 'imageUrls': n['imageUrls']}) sfzFile = [] if len(sfz_list) > 0: sfzFile = [{'fileName': '居民身份证', 'imageUrls': sfz_list}] return yst_response({ 'sfzFile': sfzFile, 'qtFile': qtFile }) @router.get('/show', response_class=FileResponse, summary="显示图片") async def show( *, request: Request, file_name: str, thumb: str = '', db: Session = Depends(get_db) ): row = db.query(YssYstUploadFileEntity).filter(YssYstUploadFileEntity.file_name == file_name).first() if row is not None: image_filepath = row.save_filepath print(image_filepath) if os.path.exists(image_filepath) == False: image_filepath = os.path.join(settings.UPLOAD_IMAGE_PATH, "blank.png") return FileResponse(image_filepath) print('file show ==========>', row.file_type, image_filepath) if thumb != '': print('thumb !!!') image = Image.open(image_filepath) buf = BytesIO() image.thumbnail((64, 64)) image.save(buf, 'png') img_data = buf.getvalue() return StreamingResponse(BytesIO(img_data), media_type="image/png") else: return FileResponse(image_filepath)