#!/usr/bin/python3 # -*- coding: utf-8 -*- from redis import StrictRedis from config import settings import json import time def get_redis(): redis = StrictRedis(host=settings.REDIS_DB_URL['host'], port=settings.REDIS_DB_URL['port'], username=settings.REDIS_DB_URL['user'], db=settings.REDIS_DB_URL['db'], password=settings.REDIS_DB_URL['password']) return redis def redis_set(name: str, val: str): redis = get_redis() redis.set(name, val) def redis_set_with_time(name: str, val: str, seconds: int): redis = get_redis() redis_set(name, val) redis.expire(name, seconds) def redis_get(name: str): redis = get_redis() val = redis.get(name) if val is not None: val = bytes.decode(val) return val def redis_lock(name: str, secs: int = 60): redis = get_redis() ret = redis.setnx(name, "1") if ret: redis.expire(name, secs) time.sleep(1) return ret def redis_unlock(name: str): val = redis_get(name) if val is not None and val == "1": redis = get_redis() redis.delete(name) def redis_set_json(name: str, val: dict, secs: int = 3600): json_str = json.dumps(val, ensure_ascii=False) redis_set_with_time(name, json_str, secs) def redis_get_json(name: str): json_str = redis_get(name) if json_str is not None: return json.loads(json_str) if __name__ == '__main__': try: # 连接 Redis(指定 username 和 password) r = StrictRedis( host='localhost', port=6379, username='default', # Redis 6.0+ 支持 password='c0b0Info' ) # 测试连接 r.ping() print("✅ 连接成功!") # 执行一些操作 r.set('test_key', 'hello') print("test_key:", r.get('test_key').decode('utf-8')) except Exception as e: print(f"❌ 连接失败: {e}")