#!/usr/bin/python3 # -*- coding: utf-8 -*- import random import string import hashlib import time import json from utils.redis_util import * import requests from urllib.parse import quote import base64 from utils import * from exceptions import YzyException from extensions import logger from config import settings from sqlalchemy.orm import Session # 应用名称:茂名市智慧应急平台 # https://open.weixin.qq.com/connect/Oauth2/authorize?appid=wld341060039&redirect_uri=https://www.baidu.com/#/&response_type=code&scope=snsapi_base&agentid=1004302&state=xxxxxx#wechat_redirect YZY_HOST = "http://19.15.0.128:8080" # YZY_HOST = "https://xtbg.digitalgd.com.cn" YZY_AGENTID = 1004302 YZY_CORPID = "wld341060039" YZY_CORPSECRET = "5_8aOBBjioNbP7KDwjyBKmwnJ05-y1WbaJlt4irM1eA" YZY_ACCESS_TOKEN_REDIS_KEY = "YZY_ACCESS_TOKEN_REDIS_KEY" YZH_PASSID = "yzy_demo" YZH_PASSTOKEN = "WjKat55cv6PrJtpCHld0trrHsv1mbCqL" def __get_access_token(): url = "{}/ebus/yzyapi/cgi-bin/gettoken?corpid={}&corpsecret={}".format(YZY_HOST, YZY_CORPID, YZY_CORPSECRET) print('yzy url:', url) timestamp = str(int(time.time())) nonce = ranstr(20) signature = calcResponseSign(timestamp, YZH_PASSTOKEN, nonce) headers = { 'Content-Type': 'application/json;charset=UTF-8', "x-tif-signature": signature, "x-tif-timestamp": timestamp, "x-tif-nonce": nonce, "x-tif-paasid": YZH_PASSID } response = requests.get(url, headers=headers, timeout=15) print('yzy return:', response.text) if response.status_code == 200 : result = response.json() errcode = int(result['errcode']) if errcode == 0: return result['access_token'] else: raise YzyException(errcode=errcode, errmsg=result['errmsg']) def get_cache_access_token(): access_token = redis_get(YZY_ACCESS_TOKEN_REDIS_KEY) if access_token is None: access_token = __get_access_token() redis_set_with_time(YZY_ACCESS_TOKEN_REDIS_KEY, access_token, 3600) return access_token def get_user_info(code: str): access_token = get_cache_access_token() url = "{}/ebus/yzyapi/cgi-bin/user/getuserinfo?access_token={}&code={}".format(YZY_HOST, access_token, code) return __post_url__(url, {}) def send_text_message(users, content: str): access_token = get_cache_access_token() url = "{}/ebus/yzyapi/cgi-bin/message/send?access_token={}".format(YZY_HOST, access_token) data = { "touser": "|".join(users), "msgtype" : "text", "agentid" : YZY_AGENTID, "text": { "content": content } } return __post_url__(url, data) def send_textcard_message(users, title: str, description: str, detail_url): access_token = get_cache_access_token() url = "{}/ebus/yzyapi/cgi-bin/message/send?access_token={}".format(YZY_HOST, access_token) touser = "" if isinstance(users, list) == True: touser = "|".join(users) else: touser = str(users) data = { "touser": touser, "msgtype" : "textcard", "agentid" : YZY_AGENTID, "textcard": { "title": title, "description": description, "url": detail_url } } return __post_url__(url, data) def __post_url__(url, data): print('yzy url:', url) print('yzy data:', data) json_str = json.dumps(data, ensure_ascii=False) data = json_str.encode('utf-8') timestamp = str(int(time.time())) nonce = ranstr(20) signature = calcResponseSign(timestamp, YZH_PASSTOKEN, nonce) headers = { 'Content-Type': 'application/json;charset=UTF-8', "x-tif-signature": signature, "x-tif-timestamp": timestamp, "x-tif-nonce": nonce, "x-tif-paasid": YZH_PASSID } response = requests.post(url, data=data,headers=headers, timeout=5) print('yzy return:', response.text) if response.status_code == 200 : result = response.json() return result ''' errcode = int(result['errcode']) if errcode == 0: return True else: raise YzyException(errcode=errcode, errmsg=result['errmsg']) ''' def ranstr(num): salt = ''.join(random.sample( string.ascii_letters + string.digits, num)) return salt # # # 生成校验码 # # def authentication(timestamp, token, nonce, uid, uinfo, ext, signature): sign_data_sha256 = calcRequestSign( timestamp, token, nonce, uid, uinfo, ext) return sign_data_sha256 == signature.upper() # # # 计算校验码 # # def calcResponseSign(timestamp, token, nonce): sign_data = "{}{}{}{}".format(timestamp, token, nonce, timestamp) return hashlib.sha256( sign_data.encode("utf8") ).hexdigest().upper() # # # 计算校验码 # # def calcRequestSign(timestamp, token, nonce, uid, uinfo, ext): sign_data = "{}{}{},{},".format( timestamp, token, nonce, uid) if len(uinfo) == 0: sign_data = sign_data + "," else: sign_data = sign_data + uinfo + "," if len(ext) == 0: sign_data = sign_data else: sign_data = sign_data + ext sign_data = sign_data + timestamp return hashlib.sha256( sign_data.encode("utf8") ).hexdigest().upper() def format_redirect_url(redirect_url: str) -> str: yzy_callback_url = quote(settings.YJXP_CALLBACK_WEB_PATH) logger.info("yzy_callback_url: {}", yzy_callback_url) state_json = { "redirect_url": redirect_url, "rnd": new_guid() } state_str = json.dumps(state_json) state = base64.b64encode(state_str.encode('utf-8')).decode('utf-8') print('state_base64:', state) detail_url = "https://open.weixin.qq.com/connect/Oauth2/authorize?appid={}&redirect_uri={}&response_type=code&scope=snsapi_base&agentid={}&state={}#wechat_redirect".format( YZY_CORPID, yzy_callback_url, YZY_AGENTID, state) logger.info("detail_url: {}", detail_url) return detail_url def add_to_msg_queue(db: Session, data: dict) -> None: new_msg = YzyMsgQueue(**data, sent_status = 0, create_time = datetime.now()) db.add(new_msg) db.commit()