wechat.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. import requests
  2. import json
  3. import logging
  4. class WeChat(object):
  5. __get_token_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'
  6. __send_message_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}'
  7. __get_userid_url = 'https://qyapi.weixin.qq.com/cgi-bin/user/getuserid?access_token={token}&debug=1'
  8. def __init__(self, corpid, corpsecret, agentid):
  9. self.corpid = corpid
  10. self.corpsecret = corpsecret
  11. self.agentid = agentid
  12. self.token = self.auth()
  13. logging.info('wechat login successful')
  14. def auth(self):
  15. params = {'corpid': self.corpid, 'corpsecret': self.corpsecret}
  16. rs = requests.get(self.__get_token_url, params=params)
  17. return rs.json()['access_token']
  18. def userid(self, mobile: str):
  19. try:
  20. url = self.__get_userid_url.format(token=self.token)
  21. data = json.dumps({'mobile': mobile})
  22. rs = requests.post(url, data=data)
  23. return rs.json()['userid']
  24. except:
  25. print('get userid error!')
  26. def message(self, touser, message):
  27. data = json.dumps({
  28. 'touser': touser,
  29. 'msgtype': 'text',
  30. 'agentid': self.agentid,
  31. 'text': {
  32. 'content': message},
  33. })
  34. return data
  35. def send(self, touser, message):
  36. try:
  37. url = self.__send_message_url.format(token=self.token)
  38. res = requests.post(url, data=self.message(touser, message))
  39. assert res.json()['errcode'] == 0
  40. except Exception as e:
  41. logging.exception('send wechat failed!', exc_info=e)
  42. if __name__ == '__main__':
  43. wechat = WeChat()
  44. wechat.send("LinXinYuan", "测试")