# -*- coding: utf-8 -*- import sqlalchemy from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from config import settings from contextlib import contextmanager mysql_dwd_config = { 'drivername': 'mysql+pymysql', 'username': settings.MYSQL_USER, 'password': settings.MYSQL_PASSWORD, 'host': settings.MYSQL_SERVER, 'port':settings.MYSQL_PORT, 'database': settings.MYSQL_DB_NAME } mysql_dwd_config_sharedb = { 'drivername': 'mysql+pymysql', 'username': settings.MYSQL_USER, 'password': settings.MYSQL_PASSWORD, 'host': settings.MYSQL_SERVER, 'port':settings.MYSQL_PORT, 'database': 'sharedb' } if sqlalchemy.__version__ >= '1.4': mysql_engine_url = sqlalchemy.engine.URL.create(**mysql_dwd_config) mysql_engine_url = mysql_engine_url.update_query_dict({'charset': 'utf8mb4'}) mysql_engine_url_sharedb = sqlalchemy.engine.URL.create(**mysql_dwd_config_sharedb) mysql_engine_url_sharedb = mysql_engine_url_sharedb.update_query_dict({'charset': 'utf8mb4'}) else: mysql_engine_url = '{drivername}://{username}:{password}@{host}:{port}/{database}?charset=utf8mb4'.format(**mysql_dwd_config) mysql_engine_url_sharedb = '{drivername}://{username}:{password}@{host}:{port}/{database}?charset=utf8mb4'.format(**mysql_dwd_config_sharedb) engine = create_engine(mysql_engine_url, echo=False, pool_size=10, pool_recycle=360, pool_pre_ping=True) SessionLocal = sessionmaker(bind=engine) engine_sharedb = create_engine(mysql_engine_url_sharedb, echo=False, pool_size=3, pool_recycle=360, pool_pre_ping=True) SessionLocalShareDb = sessionmaker(bind=engine_sharedb) Base = declarative_base() def get_db_share(): try: db = SessionLocalShareDb() yield db finally: db.close() # Dependency def get_db(): try: db = SessionLocal() yield db finally: db.close() # 适用scheduler @contextmanager def get_local_db(): try: db = SessionLocal() yield db finally: db.close() @contextmanager def get_share_db(): try: db = SessionLocalShareDb() yield db finally: db.close() # def get_db_local(): # return SessionLocal() # from database import engine # from models.geojson_base import * # from shapely.geometry import shape # import json # import pymysql # # db = get_db_local() # conn = pymysql.connect(host=settings.MYSQL_SERVER, # user=settings.MYSQL_USER, # password=settings.MYSQL_PASSWORD, # database=settings.MYSQL_DB_NAME, # port=settings.MYSQL_PORT, # charset='utf8mb4') # cur = conn.cursor() # with open('/home/python3/zj_geojson.json', 'r', encoding='utf-8') as file: # geojson = json.load(file) # features = geojson.get('features', []) # for feature in features: # # print(feature) # name = feature['properties'].get('NAME', '') # geom = shape(feature['geometry']).__geo_interface__ # 将Shapely对象转换为GeoJSON # # print(geom) # properties = json.dumps(feature['properties'], ensure_ascii=False) # pac = feature['properties'].get('PAC', '') # sql = """ # INSERT INTO tp_geojson_data_zj (name, geometry, properties,pac) # VALUES (%s, ST_GeomFromGeoJSON(%s), %s,%s) # """ # # 执行插入操作 # cur.execute(sql, (name, json.dumps(geom), properties,pac)) # conn.commit() # 提交事务 # break # 提交事务 # db.commit() # # 关闭会话 # db.close() #from models.oneshare_base import Base #from models.knowledge_base import Base # # #使用Base的metadata和engine来创建所有表 #Base.metadata.create_all(bind=engine)