database.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. # -*- coding: utf-8 -*-
  2. import sqlalchemy
  3. from sqlalchemy import create_engine
  4. from sqlalchemy.ext.declarative import declarative_base
  5. from sqlalchemy.orm import sessionmaker
  6. from config import settings
  7. mysql_dwd_config = {
  8. 'drivername': 'mysql+pymysql',
  9. 'username': settings.MYSQL_USER,
  10. 'password': settings.MYSQL_PASSWORD,
  11. 'host': settings.MYSQL_SERVER,
  12. 'port':settings.MYSQL_PORT,
  13. 'database': settings.MYSQL_DB_NAME
  14. }
  15. if sqlalchemy.__version__ >= '1.4':
  16. mysql_engine_url = sqlalchemy.engine.URL.create(**mysql_dwd_config)
  17. mysql_engine_url = mysql_engine_url.update_query_dict({'charset': 'utf8mb4'})
  18. else:
  19. mysql_engine_url = '{drivername}://{username}:{password}@{host}:{port}/{database}?charset=utf8mb4'.format(**mysql_dwd_config)
  20. engine = create_engine(mysql_engine_url, echo=False, pool_size=100, pool_recycle=3600, pool_pre_ping=True)
  21. SessionLocal = sessionmaker(bind=engine)
  22. Base = declarative_base()
  23. # Dependency
  24. def get_db():
  25. try:
  26. db = SessionLocal()
  27. yield db
  28. finally:
  29. db.close()
  30. def get_db_local():
  31. return SessionLocal()
  32. # from database import engine
  33. # from models.geojson_base import *
  34. # from shapely.geometry import shape
  35. # import json
  36. # import pymysql
  37. # # db = get_db_local()
  38. # conn = pymysql.connect(host=settings.MYSQL_SERVER,
  39. # user=settings.MYSQL_USER,
  40. # password=settings.MYSQL_PASSWORD,
  41. # database=settings.MYSQL_DB_NAME,
  42. # port=settings.MYSQL_PORT,
  43. # charset='utf8mb4')
  44. # cur = conn.cursor()
  45. # with open('/home/python3/zj_geojson.json', 'r', encoding='utf-8') as file:
  46. # geojson = json.load(file)
  47. # features = geojson.get('features', [])
  48. # for feature in features:
  49. # # print(feature)
  50. # name = feature['properties'].get('NAME', '')
  51. # geom = shape(feature['geometry']).__geo_interface__ # 将Shapely对象转换为GeoJSON
  52. # # print(geom)
  53. # properties = json.dumps(feature['properties'], ensure_ascii=False)
  54. # pac = feature['properties'].get('PAC', '')
  55. # sql = """
  56. # INSERT INTO tp_geojson_data_zj (name, geometry, properties,pac)
  57. # VALUES (%s, ST_GeomFromGeoJSON(%s), %s,%s)
  58. # """
  59. # # 执行插入操作
  60. # cur.execute(sql, (name, json.dumps(geom), properties,pac))
  61. # conn.commit()
  62. # 提交事务
  63. # break
  64. # 提交事务
  65. # db.commit()
  66. # # 关闭会话
  67. # db.close()
  68. #from models.oneshare_base import Base
  69. #from models.knowledge_base import Base
  70. #
  71. # #使用Base的metadata和engine来创建所有表
  72. #Base.metadata.create_all(bind=engine)