#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status from common.security import valid_access_token from sqlalchemy.orm import Session from sqlalchemy.sql import func from common.auth_user import * from sqlalchemy import text from pydantic import BaseModel from common.BigDataCenterAPI import * from database import get_db from typing import List from utils import * from utils.risk import * import json import traceback router = APIRouter() @router.post('/get_max_forest') async def mine(request: Request,db: Session = Depends(get_db)): try: body = await request.json() result = get_max_forest_level(db) #print(result) result = get_warning_description(result) return { "code": 200, "msg": "成功", "data": {"max_level":result} } except Exception as e: db.rollback() traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") # def get_warning_description(level): # if level == 1: # return "无危险" # elif level == 2: # return "较低危险" # elif level == 3: # return "中等危险" # elif level == 4: # return "高火险级" # elif level == 5: # return "最高火险级" # else: # return "未知等级" def get_warning_description(level): if level == 1: return "无危险" elif level == 2: return "较低危险" elif level == 3: return "较高危险" elif level == 4: return "高度危险" elif level == 5: return "极度危险" @router.post('/forest_warrning') async def mine(request: Request,db: Session = Depends(get_db)): try: body = await request.json() max_time = get_forest_max_time(db) result = '' forest_warring = get_forest_warring(db) #print(forest_warring) # 分析数据 high_risk_areas = [] medium_risk_areas = [] low_risk_areas = [] all_low_risk = True # 假设所有区域都是低风险 for item in forest_warring: if item['warning_level'] >= 2: # 只考虑等级大于等于2的 all_low_risk = False level_description = get_warning_description(item['warning_level']) area_report = f"{item['area_name']}的森林火险等级为{level_description}({item['warning_level']}级)" if item['warning_level'] == 2: low_risk_areas.append(area_report) elif item['warning_level'] == 3: medium_risk_areas.append(area_report) elif item['warning_level'] >= 4: high_risk_areas.append(area_report) # 构建描述句子 if all_low_risk: risk_description = "当前无风险" else: risk_description = "" if high_risk_areas: risk_description += ";".join(high_risk_areas) if medium_risk_areas: if risk_description: risk_description += ";" risk_description += ";".join(medium_risk_areas) if low_risk_areas: if risk_description: risk_description += ";" risk_description += ";".join(low_risk_areas) result = '' # 输出结果 if risk_description: result = f"{max_time},{risk_description}" else: result =f"{max_time},当前无风险" #print(result) return { "code": 200, "msg": "成功", "data": {"result":result} } except Exception as e: db.rollback() traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")