123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- # -*- coding: utf-8 -*-
- import sqlalchemy
- from sqlalchemy import create_engine
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy.orm import sessionmaker
- from config import settings
- from contextlib import contextmanager
- mysql_dwd_config = {
- 'drivername': 'mysql+pymysql',
- 'username': settings.MYSQL_USER,
- 'password': settings.MYSQL_PASSWORD,
- 'host': settings.MYSQL_SERVER,
- 'port':settings.MYSQL_PORT,
- 'database': settings.MYSQL_DB_NAME
- }
- mysql_dwd_config_sharedb = {
- 'drivername': 'mysql+pymysql',
- 'username': settings.MYSQL_USER,
- 'password': settings.MYSQL_PASSWORD,
- 'host': settings.MYSQL_SERVER,
- 'port':settings.MYSQL_PORT,
- 'database': 'sharedb'
- }
- if sqlalchemy.__version__ >= '1.4':
- mysql_engine_url = sqlalchemy.engine.URL.create(**mysql_dwd_config)
- mysql_engine_url = mysql_engine_url.update_query_dict({'charset': 'utf8mb4'})
- mysql_engine_url_sharedb = sqlalchemy.engine.URL.create(**mysql_dwd_config_sharedb)
- mysql_engine_url_sharedb = mysql_engine_url_sharedb.update_query_dict({'charset': 'utf8mb4'})
- else:
- mysql_engine_url = '{drivername}://{username}:{password}@{host}:{port}/{database}?charset=utf8mb4'.format(**mysql_dwd_config)
- mysql_engine_url_sharedb = '{drivername}://{username}:{password}@{host}:{port}/{database}?charset=utf8mb4'.format(**mysql_dwd_config_sharedb)
- engine = create_engine(mysql_engine_url, echo=False, pool_size=10, pool_recycle=360, pool_pre_ping=True)
- SessionLocal = sessionmaker(bind=engine)
- engine_sharedb = create_engine(mysql_engine_url_sharedb, echo=False, pool_size=3, pool_recycle=360, pool_pre_ping=True)
- SessionLocalShareDb = sessionmaker(bind=engine_sharedb)
- Base = declarative_base()
- def get_db_share():
- try:
- db = SessionLocalShareDb()
- yield db
- finally:
- db.close()
- # Dependency
- def get_db():
- try:
- db = SessionLocal()
- yield db
- finally:
- db.close()
- # 适用scheduler
- @contextmanager
- def get_local_db():
- try:
- db = SessionLocal()
- yield db
- finally:
- db.close()
- @contextmanager
- def get_share_db():
- try:
- db = SessionLocalShareDb()
- yield db
- finally:
- db.close()
- # def get_db_local():
- # return SessionLocal()
- # from database import engine
- # from models.geojson_base import *
- # from shapely.geometry import shape
- # import json
- # import pymysql
- # # db = get_db_local()
- # conn = pymysql.connect(host=settings.MYSQL_SERVER,
- # user=settings.MYSQL_USER,
- # password=settings.MYSQL_PASSWORD,
- # database=settings.MYSQL_DB_NAME,
- # port=settings.MYSQL_PORT,
- # charset='utf8mb4')
- # cur = conn.cursor()
- # with open('/home/python3/zj_geojson.json', 'r', encoding='utf-8') as file:
- # geojson = json.load(file)
- # features = geojson.get('features', [])
- # for feature in features:
- # # print(feature)
- # name = feature['properties'].get('NAME', '')
- # geom = shape(feature['geometry']).__geo_interface__ # 将Shapely对象转换为GeoJSON
- # # print(geom)
- # properties = json.dumps(feature['properties'], ensure_ascii=False)
- # pac = feature['properties'].get('PAC', '')
- # sql = """
- # INSERT INTO tp_geojson_data_zj (name, geometry, properties,pac)
- # VALUES (%s, ST_GeomFromGeoJSON(%s), %s,%s)
- # """
- # # 执行插入操作
- # cur.execute(sql, (name, json.dumps(geom), properties,pac))
- # conn.commit()
- # 提交事务
- # break
- # 提交事务
- # db.commit()
- # # 关闭会话
- # db.close()
- #from models.oneshare_base import Base
- #from models.knowledge_base import Base
- #
- # #使用Base的metadata和engine来创建所有表
- #Base.metadata.create_all(bind=engine)
|