123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status
- from common.security import valid_access_token
- from fastapi.responses import JSONResponse
- from sqlalchemy.orm import Session
- from sqlalchemy import and_, or_,text
- from sqlalchemy.sql import func
- from sqlalchemy.future import select
- from common.auth_user import *
- from pydantic import BaseModel
- from database import get_db
- from typing import List
- from models import *
- from utils import *
- from utils.ry_system_util import *
- from utils.video_util import *
- from collections import defaultdict
- import traceback
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from jobs.rainfall_conditions_job import get_stcd_data
- from multiprocessing import Pool, cpu_count
- import json
- import time
- import math
- def get_real_code(db,longitude,latitude):
- sql = text("""
- SELECT area_name, `code`, longitude, latitude, distance FROM (
- SELECT
- area_name,
- `code`,
- longitude,
- latitude,
- ST_Distance_Sphere(
- ST_GeomFromText(CONCAT('POINT(', longitude, ' ', latitude, ')')),
- ST_GeomFromText(:point)
- ) AS distance
- FROM govdata_real_time_address
- WHERE longitude IS NOT NULL AND latitude IS NOT NULL
- ORDER BY distance ASC
- LIMIT 1
- ) T
- """).bindparams(point=f"POINT({longitude} {latitude})")
- # 执行查询
- result = db.execute(sql).fetchone()
- # 处理结果
- if result:
- return dict(result)['code']
- else:
- return None
- # def get_rainfall(
- # code: str,num:int,
- # db: Session = Depends(get_db)
- # ):
- # value=0
- # rainfulldata = get_stcd_data(code,num+1)
- # for i in rainfulldata:
- # value += i['F3070220000034_000018005']
- #
- # return value
- def get_rainfall(
- code: str,num:int=24,
- db: Session = Depends(get_db)
- ):
- value=0
- now = datetime.now()
- twenty_four_hours_ago = now - timedelta(hours=num+1)
- formatted_now = f"{now.strftime('%Y-%m-%d %H')}:00:00"
- formatted_twenty_four_hours_ago = f"{twenty_four_hours_ago.strftime('%Y-%m-%d %H')}:00:00"
- sql = text("""SELECT SUM(rainfall) as rainfall FROM sharedb.`govdata_rain_data_info` where `code`=:code and create_time BETWEEN :formatted_twenty_four_hours_ago and :formatted_now""").bindparams(code=code,formatted_twenty_four_hours_ago=formatted_twenty_four_hours_ago,formatted_now=formatted_now)
- # 执行查询
- # print(sql,formatted_now,formatted_twenty_four_hours_ago)
- result = db.execute(sql).fetchone()
- # 处理结果
- if result:
- return dict(result)['rainfall']
- else:
- return 0
- # rainfulldata = get_stcd_data(code,num+1)
- # for i in rainfulldata:
- # value += i['F3070220000034_000018005']
- #
- # return value
- def get_weather_warning(area_name : str,db: Session = Depends(get_db)):
- area_name= area_name.replace('区','').replace('市','').replace('技术开发区','').replace('新区','')
- sql = text(
- """SELECT CONCAT(`type`,`state`) as `weather_warning_type`,levelnum as `weather_warninglevel` FROM sharedb.weather_latest_warnings where county = :county or county='茂名' ORDER BY `levelnum` desc LIMIT 1""").bindparams(
- county=area_name)
- # 执行查询
- # print(sql,formatted_now,formatted_twenty_four_hours_ago)
- result = db.execute(sql).fetchone()
- # 处理结果
- if result:
- return dict(result)['weather_warning_type'],dict(result)['weather_warninglevel']
- else:
- return '-','-'
|