| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354 |
- import requests
- import json
- import logging
- class WeChat(object):
- __get_token_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'
- __send_message_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}'
- __get_userid_url = 'https://qyapi.weixin.qq.com/cgi-bin/user/getuserid?access_token={token}&debug=1'
- def __init__(self, corpid, corpsecret, agentid):
- self.corpid = corpid
- self.corpsecret = corpsecret
- self.agentid = agentid
- self.token = self.auth()
- logging.info('wechat login successful')
-
- def auth(self):
- params = {'corpid': self.corpid, 'corpsecret': self.corpsecret}
- rs = requests.get(self.__get_token_url, params=params)
- return rs.json()['access_token']
-
- def userid(self, mobile: str):
- try:
- url = self.__get_userid_url.format(token=self.token)
- data = json.dumps({'mobile': mobile})
- rs = requests.post(url, data=data)
- return rs.json()['userid']
- except:
- print('get userid error!')
-
- def message(self, touser, message):
- data = json.dumps({
- 'touser': touser,
- 'msgtype': 'text',
- 'agentid': self.agentid,
- 'text': {
- 'content': message},
- })
- return data
- def send(self, touser, message):
- try:
- url = self.__send_message_url.format(token=self.token)
- res = requests.post(url, data=self.message(touser, message))
- assert res.json()['errcode'] == 0
- except Exception as e:
- logging.exception('send wechat failed!', exc_info=e)
- if __name__ == '__main__':
- wechat = WeChat()
- wechat.send("LinXinYuan", "测试")
|