123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- #!/usr/bin/python3
- # -*- coding: utf-8 -*-
- import random
- import string
- import hashlib
- import time
- import json
- from utils.redis_util import *
- import requests
- from urllib.parse import quote
- import base64
- from utils import *
- from exceptions import YzyException
- from extensions import logger
- from config import settings
- from sqlalchemy.orm import Session
- # 应用名称:茂名市智慧应急平台
- # https://open.weixin.qq.com/connect/Oauth2/authorize?appid=wld341060039&redirect_uri=https://www.baidu.com/#/&response_type=code&scope=snsapi_base&agentid=1004302&state=xxxxxx#wechat_redirect
- YZY_HOST = "http://19.15.0.128:8080"
- # YZY_HOST = "https://xtbg.digitalgd.com.cn"
- YZY_AGENTID = 1004302
- YZY_CORPID = "wld341060039"
- YZY_CORPSECRET = "5_8aOBBjioNbP7KDwjyBKmwnJ05-y1WbaJlt4irM1eA"
- YZY_ACCESS_TOKEN_REDIS_KEY = "YZY_ACCESS_TOKEN_REDIS_KEY"
- YZH_PASSID = "yzy_demo"
- YZH_PASSTOKEN = "WjKat55cv6PrJtpCHld0trrHsv1mbCqL"
- def __get_access_token():
- url = "{}/ebus/yzyapi/cgi-bin/gettoken?corpid={}&corpsecret={}".format(YZY_HOST, YZY_CORPID, YZY_CORPSECRET)
- print('yzy url:', url)
- timestamp = str(int(time.time()))
- nonce = ranstr(20)
- signature = calcResponseSign(timestamp, YZH_PASSTOKEN, nonce)
- headers = {
- 'Content-Type': 'application/json;charset=UTF-8',
- "x-tif-signature": signature,
- "x-tif-timestamp": timestamp,
- "x-tif-nonce": nonce,
- "x-tif-paasid": YZH_PASSID
- }
- response = requests.get(url, headers=headers, timeout=15)
- print('yzy return:', response.text)
- if response.status_code == 200 :
- result = response.json()
- errcode = int(result['errcode'])
- if errcode == 0:
- return result['access_token']
- else:
- raise YzyException(errcode=errcode, errmsg=result['errmsg'])
- def get_cache_access_token():
- access_token = redis_get(YZY_ACCESS_TOKEN_REDIS_KEY)
- if access_token is None:
- access_token = __get_access_token()
- redis_set_with_time(YZY_ACCESS_TOKEN_REDIS_KEY, access_token, 3600)
-
- return access_token
- def get_user_info(code: str):
- access_token = get_cache_access_token()
- url = "{}/ebus/yzyapi/cgi-bin/user/getuserinfo?access_token={}&code={}".format(YZY_HOST, access_token, code)
- return __post_url__(url, {})
- def send_text_message(users, content: str):
- access_token = get_cache_access_token()
- url = "{}/ebus/yzyapi/cgi-bin/message/send?access_token={}".format(YZY_HOST, access_token)
-
- data = {
- "touser": "|".join(users),
- "msgtype" : "text",
- "agentid" : YZY_AGENTID,
- "text": {
- "content": content
- }
- }
- return __post_url__(url, data)
- def send_textcard_message(users, title: str, description: str, detail_url):
- access_token = get_cache_access_token()
- url = "{}/ebus/yzyapi/cgi-bin/message/send?access_token={}".format(YZY_HOST, access_token)
-
- touser = ""
- if isinstance(users, list) == True:
- touser = "|".join(users)
- else:
- touser = str(users)
- data = {
- "touser": touser,
- "msgtype" : "textcard",
- "agentid" : YZY_AGENTID,
- "textcard": {
- "title": title,
- "description": description,
- "url": detail_url
- }
- }
- return __post_url__(url, data)
- def __post_url__(url, data):
- print('yzy url:', url)
- print('yzy data:', data)
- json_str = json.dumps(data, ensure_ascii=False)
- data = json_str.encode('utf-8')
-
- timestamp = str(int(time.time()))
- nonce = ranstr(20)
- signature = calcResponseSign(timestamp, YZH_PASSTOKEN, nonce)
- headers = {
- 'Content-Type': 'application/json;charset=UTF-8',
- "x-tif-signature": signature,
- "x-tif-timestamp": timestamp,
- "x-tif-nonce": nonce,
- "x-tif-paasid": YZH_PASSID
- }
- response = requests.post(url, data=data,headers=headers, timeout=5)
- print('yzy return:', response.text)
-
- if response.status_code == 200 :
- result = response.json()
- return result
- '''
- errcode = int(result['errcode'])
- if errcode == 0:
- return True
- else:
- raise YzyException(errcode=errcode, errmsg=result['errmsg'])
- '''
-
- def ranstr(num):
- salt = ''.join(random.sample(
- string.ascii_letters + string.digits, num))
- return salt
- #
- #
- # 生成校验码
- #
- #
- def authentication(timestamp, token, nonce, uid, uinfo, ext, signature):
- sign_data_sha256 = calcRequestSign(
- timestamp, token, nonce, uid, uinfo, ext)
- return sign_data_sha256 == signature.upper()
- #
- #
- # 计算校验码
- #
- #
- def calcResponseSign(timestamp, token, nonce):
- sign_data = "{}{}{}{}".format(timestamp, token, nonce, timestamp)
- return hashlib.sha256(
- sign_data.encode("utf8")
- ).hexdigest().upper()
- #
- #
- # 计算校验码
- #
- #
- def calcRequestSign(timestamp, token, nonce, uid, uinfo, ext):
- sign_data = "{}{}{},{},".format(
- timestamp, token, nonce, uid)
- if len(uinfo) == 0:
- sign_data = sign_data + ","
- else:
- sign_data = sign_data + uinfo + ","
- if len(ext) == 0:
- sign_data = sign_data
- else:
- sign_data = sign_data + ext
- sign_data = sign_data + timestamp
- return hashlib.sha256(
- sign_data.encode("utf8")
- ).hexdigest().upper()
- def format_redirect_url(redirect_url: str) -> str:
- yzy_callback_url = quote(settings.YJXP_CALLBACK_WEB_PATH)
- logger.info("yzy_callback_url: {}", yzy_callback_url)
-
- state_json = {
- "redirect_url": redirect_url,
- "rnd": new_guid()
- }
- state_str = json.dumps(state_json)
- # print(state_str)
- state = base64.b64encode(state_str.encode('utf-8')).decode('utf-8')
-
- detail_url = "https://open.weixin.qq.com/connect/Oauth2/authorize?appid={}&redirect_uri={}&response_type=code&scope=snsapi_base&agentid={}&state={}#wechat_redirect".format(
- YZY_CORPID,
- yzy_callback_url,
- YZY_AGENTID,
- state)
-
- logger.info("detail_url: {}", detail_url)
- return detail_url
-
- def add_to_msg_queue(db: Session, data: dict) -> None:
- new_msg = YzyMsgQueue(**data, sent_status = 0, create_time = datetime.now())
- db.add(new_msg)
- db.commit()
|