浏览代码

ncov-auto-punch

ignatz 3 年之前
当前提交
cf715a924b
共有 11 个文件被更改,包括 317 次插入0 次删除
  1. 3 0
      .gitignore
  2. 32 0
      conf.py
  3. 37 0
      install.sh
  4. 53 0
      lib/send.py
  5. 54 0
      lib/wechat.py
  6. 28 0
      ncov_auto_punch.py
  7. 20 0
      punch.sh
  8. 2 0
      requirements.txt
  9. 65 0
      robot/chrome.py
  10. 18 0
      robot/loc.py
  11. 5 0
      robot/url.py

+ 3 - 0
.gitignore

@@ -0,0 +1,3 @@
+__pycache__/
+log/
+driver/

+ 32 - 0
conf.py

@@ -0,0 +1,32 @@
+import os
+import sys
+from os.path import join
+
+smtp = {
+    "sender": "疫情防控通小助手 <ncov_robot@163.com>",
+    "server": "smtp.163.com",
+    "port": 465,
+    "user": "ncov_robot@163.com",
+    "password": "***REMOVED***"
+}
+wechat = {
+    "corpid": "***REMOVED***",
+    "corpsecret": "***REMOVED***",
+    "agentid": "***REMOVED***"
+}
+
+class path:
+    base = os.path.dirname(sys.argv[0])
+    geckodriver = join(base, 'driver/geckodriver')
+    chromedriver = join(base, 'driver/chromedriver')
+    log = join(base, 'log/punch.log')
+    geckodriver_log = join(base, 'log/geckodriver.log')
+    chromedriver_log = join(base, 'log/chromedriver.log')
+    conf = join(base, 'conf/conf.json')
+    users = join(base, 'conf/users.json')
+
+class text:
+    subject = '疫情防控通自动打卡结果'
+    exc = '自动打卡异常!'
+    ok = '自动打卡成功'
+    already = '已打卡过'

+ 37 - 0
install.sh

@@ -0,0 +1,37 @@
+sudo apt-get update
+sudo apt-get install wget curl unzip python3 python3-pip xvfb -y
+
+cd `dirname $0`
+if [[ ! `dpkg -l | grep chrome` ]]
+then
+    echo 'install google-chrome'
+    deb_url='https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb'
+    wget $deb_url -O –
+    sudo dpkg -i –
+    rm -f –
+fi
+
+echo 'download chromedriver...'
+chrome_version=`google-chrome --version | awk -F '[ .]' '/^Google Chrome [0-9]+(.[0-9])*/ {print $3}'`
+if [[ ! $chrome_version ]]
+then
+    echo 'google-chrome not found'
+    exit 1
+fi
+[[ ! -d driver ]] && mkdir driver
+if [[ -f driver/chromedriver ]]
+then
+    driver_version=`driver/chromedriver --version | awk -F '[ .]' '/^ChromeDriver [0-9]+(.[0-9])* */ {print $2}'`
+fi
+if [[ $chrome_version -ne $driver_version ]]
+then
+    driver_version_url=https://chromedriver.storage.googleapis.com/LATEST_RELEASE_$chrome_version
+    driver_version=`curl $driver_version_url`
+    driver_url=https://chromedriver.storage.googleapis.com/$driver_version/chromedriver_linux64.zip
+    wget -O chromedriver_linux64.zip $driver_url
+    unzip -d driver -o chromedriver_linux64.zip
+    rm -f chromedriver_linux64.zip
+fi
+
+echo 'install python requirements...'
+pip3 install -r requirements.txt

+ 53 - 0
lib/send.py

@@ -0,0 +1,53 @@
+from email.mime.text import MIMEText
+import logging
+import smtplib
+from .wechat import WeChat
+
+
+class Sender:
+    def __init__(self, smtp_options=None, wechat_options=None) -> None:
+        self.smtp = None
+        self.wechat = None
+        if smtp_options:
+            try:
+                self.__init_smtp(**smtp_options)
+            except Exception as e:
+                logging.exception('smtp login failed', exc_info=e)
+        if wechat_options:
+            try:
+                self.wechat = WeChat(**wechat_options)
+            except Exception as e:
+                logging.exception('wechat login failed', exc_info=e)
+        
+    def __init_smtp(self, server, port, user, password, sender):
+        self.smtp = smtplib.SMTP_SSL(server, port)
+        self.smtp.login(user, password)
+        self.smtp_from = sender
+        logging.info('smtp login successful')
+    
+    def send_mail(self, to, text, subject=None):
+        if not self.smtp: return
+        msg = MIMEText(text, 'plain', 'utf-8')
+        if subject:
+            msg['Subject'] = subject
+            msg['From'] = self.smtp_from
+            msg['To'] = to
+        try:
+            self.smtp.sendmail(self.smtp.user, to, msg.as_bytes())
+            logging.info('send mail successful')
+        except Exception as e:
+            logging.exception('send mail failed', exc_info=e)
+
+    def send_wechat(self, touser, msg):
+        if not self.wechat: return
+        try:
+            self.wechat.send(touser, msg)
+            logging.info('send wechat successful')
+        except Exception as e:
+            logging.exception('send wechat failed', exc_info=e)
+    
+    def send(self, msg, subject=None, mail=None, wechat=None, phone=None):
+        if mail:
+            self.send_mail(mail, msg, subject)
+        if wechat:
+            self.send_wechat(wechat, msg)

+ 54 - 0
lib/wechat.py

@@ -0,0 +1,54 @@
+import requests
+import json
+import logging
+
+
+class WeChat(object):
+    __get_token_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'
+    __send_message_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}'
+    __get_userid_url = 'https://qyapi.weixin.qq.com/cgi-bin/user/getuserid?access_token={token}&debug=1'
+
+    def __init__(self, corpid, corpsecret, agentid):
+        self.corpid = corpid
+        self.corpsecret = corpsecret
+        self.agentid = agentid
+        self.token = self.auth()
+        logging.info('wechat login successful')
+    
+    def auth(self):
+        params = {'corpid': self.corpid, 'corpsecret': self.corpsecret}
+        rs = requests.get(self.__get_token_url, params=params)
+        return rs.json()['access_token']
+    
+    def userid(self, mobile: str):
+        try:
+            url = self.__get_userid_url.format(token=self.token)
+            data = json.dumps({'mobile': mobile})
+            rs = requests.post(url, data=data)
+            return rs.json()['userid']
+        except:
+            print('get userid error!')
+        
+
+    def message(self, touser, message):
+        data = json.dumps({
+            'touser': touser,
+            'msgtype': 'text',
+            'agentid': self.agentid,
+            'text': {
+                'content': message},
+        })
+        return data
+
+    def send(self, touser, message):
+        try:
+            url = self.__send_message_url.format(token=self.token)
+            res = requests.post(url, data=self.message(touser, message))
+            assert res.json()['errcode'] == 0
+        except Exception as e:
+            logging.exception('send wechat failed!', exc_info=e)
+
+
+if __name__ == '__main__':
+    wechat = WeChat()
+    wechat.send("LinXinYuan", "测试")

+ 28 - 0
ncov_auto_punch.py

@@ -0,0 +1,28 @@
+import json
+import logging
+import sys
+import conf
+from conf import path, text
+from robot.chrome import Robot
+from lib.send import Sender
+
+
+if __name__ == '__main__':
+    logging.basicConfig(level=logging.INFO, filename=path.log,
+        format='%(asctime)s [%(levelname)s] %(message)s')
+    sender = Sender(conf.smtp, conf.wechat)
+    robot = Robot()
+    with open(sys.argv[1]) as f:
+        users = json.load(f)
+    for user in users:
+        try:
+            result = robot.punch(**user['punch'])
+            logging.info('result: ' + result)
+        except Exception as e:
+            result = text.exc
+            logging.exception(result, exc_info=e)
+        finally:
+            result = user['punch']['username'] + result
+            sender.send(result, text.subject, **user['contact'])
+            print(result)
+

+ 20 - 0
punch.sh

@@ -0,0 +1,20 @@
+# punch.sh
+
+find_free_servernum() {
+    i=99
+    while [ -f /tmp/.X$i-lock ]
+    do
+        i=$(($i + 1))
+    done
+    return $i
+}
+
+base=`dirname $0`
+logdir=$base/log
+[[ ! -d $logdir ]] && mkdir $logdir
+find_free_servernum
+servernum=$?
+Xvfb :$servernum &
+pid=$!
+DISPLAY=:$servernum python3 $base/ncov_auto_punch.py $1
+kill -15 $pid

+ 2 - 0
requirements.txt

@@ -0,0 +1,2 @@
+selenium==4.3.0
+requests==2.28.1

+ 65 - 0
robot/chrome.py

@@ -0,0 +1,65 @@
+from selenium import webdriver
+from selenium.webdriver.chrome.service import Service
+from selenium.webdriver.common.keys import Keys
+from selenium.webdriver.support.ui import WebDriverWait
+from selenium.webdriver.support import expected_conditions as EC
+import logging
+
+from conf import text, path
+from . import url, loc
+
+
+class Robot:
+    def __init__(self):
+        service = Service(path.chromedriver, log_path=path.chromedriver_log)
+        self.driver = webdriver.Chrome(service=service)
+        self.driver.implicitly_wait(10)
+        self.wait = WebDriverWait(self.driver, 10)
+        
+    def wait_to_click(self, loc):
+        return self.wait.until(EC.element_to_be_clickable(loc))
+    
+    def login(self, username, password):
+        logging.info('login...')
+        self.driver.get(url.logout)
+        self.driver.get(url.login)
+        self.driver.switch_to.frame('loginIframe')
+        self.wait_to_click(loc.username).send_keys(username)
+        self.wait_to_click(loc.password).send_keys(password + Keys.RETURN)
+        self.wait.until(EC.url_to_be(url.uc_index))
+        logging.info('login successful')
+    
+    def punch(self, username, password, at_school, risky, geolocation):
+        self.login(username, password)
+        self.driver.execute_cdp_cmd('Emulation.setGeolocationOverride', geolocation)
+        self.driver.get(url.ncov)
+        school_loc = loc.is_at_school if at_school else loc.not_at_school
+        risk_loc = loc.risky if risky else loc.no_risk
+        self.wait_to_click(school_loc).click()
+        self.wait_to_click(risk_loc).click()
+        self.wait_to_click(loc.geolocation).click()
+        self.wait.until(EC.invisibility_of_element_located((loc.loading)))
+        btn = self.wait_to_click(loc.submit_button)
+        btn.click()
+        if not btn.get_attribute('class'):
+            self.wait_to_click(loc.submit_confirm_button).click()
+            self.wait.until(EC.invisibility_of_element_located(loc.loading))
+        result = self.wait.until(EC.visibility_of_element_located(loc.result)).text
+        if '成功' in result:
+            result = text.ok
+        if '已提交过' in result:
+            result = text.already
+        return result
+    
+    def quit(self):
+        self.driver.quit()
+    
+    def test_geolocation(self, geolocation):
+        self.driver.get(url.geolocation)
+        self.driver.execute_cdp_cmd('Emulation.setGeolocationOverride', geolocation)
+        self.driver.switch_to.frame('iframeResult')
+        self.wait_to_click((loc.By.TAG_NAME, 'button')).click()
+        from time import sleep
+        sleep(5)
+        result = self.driver.find_element(loc.By.ID, 'demo').text
+        return result

+ 18 - 0
robot/loc.py

@@ -0,0 +1,18 @@
+# loc.py
+
+from selenium.webdriver.common.by import By
+
+username = By.ID, 'username'
+password = By.ID, 'password'
+
+_at_school = '/html/body/div[1]/div/div/section/div[4]/ul/li[4]/div/div/div[{}]/span[1]'
+is_at_school = By.XPATH, _at_school.format(1)
+not_at_school = By.XPATH, _at_school.format(2)
+_risk = '/html/body/div[1]/div/div/section/div[4]/ul/li[9]/div/div/div[{}]/span[1]'
+risky = By.XPATH, _risk.format(1)
+no_risk = By.XPATH, _risk.format(2)
+geolocation = By.XPATH, '/html/body/div[1]/div/div/section/div[4]/ul/li[8]/div/input'
+loading = By.CLASS_NAME, 'page-loading-container'
+submit_button = By.CSS_SELECTOR, '.footers > a'
+submit_confirm_button = By.CLASS_NAME, 'wapcf-btn-ok'
+result = By.CLASS_NAME, 'wapat-title'

+ 5 - 0
robot/url.py

@@ -0,0 +1,5 @@
+login = 'https://auth.bupt.edu.cn/authserver/login'
+logout = 'https://auth.bupt.edu.cn/authserver/logout'
+uc_index = 'https://uc.bupt.edu.cn/#/user/pc/index'
+ncov = 'https://app.bupt.edu.cn/ncov/wap/default/index'
+geolocation = 'https://www.runoob.com/try/try.php?filename=tryhtml5_geolocation'