UCAS自动同步课件脚本

Cool Idea

在UCAS进入研一的生活接近两个礼拜,发现老师的课件基本上都上传到课程网站,然后是一节一节给的。这样,每次老师更新课件的时候,我们都得开一下网页,然后看看是不是更新了?(虽然有邮箱提醒,但是这种邮件我都直接忽略的),于是萌生了写个脚本来自动下载的想法。

创作过程及难点

首先是进行抓包分析。登录SEP很简单,直接发送了用户密码还有sb=sb参数。

进入SEP后,该系统采用一个特殊的key传递到另个一个系统(比如选课,课程网站等),捕获这个key之后,就可以进入课程网站啦。

然后分别分析每门课的课件的URL。需要注意的是:

  • 有的老师是给出HTML链接,需要进行重定向,而且需要解决给的链接挂掉的情况。
  • 文件夹需要递归下载。(不能单纯的用’.’来判断,有的老师就是没有扩展名的,不知道上传的是什么鬼)
  • 在下载模块中,使用的是多线程,在多线程中,需要注意创建文件夹路径不存在时进行创建需要加锁。
  • 对于已经存在的文件,这不要进行下载了
  • 验证码的识别(2017.2更新,多了验证码)
  • 进入第二学期后,只查看第二学期的课程(学期的概念)

此外,还用PyInstaller打包成了exe文件。

 

Code

代码如下:完整代码以及后续更新请见github: UCAS 课件自动下载

# -*- coding: utf-8 -*-
# @Date    : 2016/9/9
# @Author  : hrwhisper
import codecs
import re
import os
import multiprocessing
from multiprocessing.dummy import Pool
from datetime import datetime
import urllib.parse
import requests
from bs4 import BeautifulSoup


def read_file():
    with codecs.open(r'./private', "r", "utf-8") as f:
        res = f.read().split("\n")
        username, password, save_path = res
    return username, password, save_path


class Ucas(object):
    username, password, save_base_path = read_file()

    def __init__(self, time_out=10):
        self.__BEAUTIFULSOUPPARSE = 'html5lib'  # or use 'lxml'
        self.session = requests.session()
        self.headers = {
            "Host": "sep.ucas.ac.cn",
            "Connection": "keep-alive",
            "Upgrade-Insecure-Requests": "1",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36",
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
            "Accept-Encoding": "gzip, deflate, sdch",
            "Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.6,en;q=0.4",
        }
        self.course_list = []
        self.to_download = []
        self.lock = multiprocessing.Lock()
        self._time_out = time_out

    def _login_sep(self):
        # 登录sep
        print('Login....')
        url = "http://sep.ucas.ac.cn/slogin"
        post_data = {
            "userName": self.username,
            "pwd": self.password,
            "sb": "sb"
        }
        html = self.session.post(url, data=post_data, headers=self.headers).text
        result = BeautifulSoup(html, self.__BEAUTIFULSOUPPARSE).find('div', class_='alert alert-error')
        if result: raise ValueError('用户名或者密码错误')

    def _get_course_page(self):
        # 从sep中获取Identity Key来登录课程系统,并获取课程信息
        url = "http://sep.ucas.ac.cn/portal/site/16/801"
        r = self.session.get(url, headers=self.headers)
        code = re.findall(r'"http://course.ucas.ac.cn/portal/plogin\?Identity=(.*)"', r.text)[0]

        url = "http://course.ucas.ac.cn/portal/plogin?Identity=" + code
        self.headers['Host'] = "course.ucas.ac.cn"
        html = self.session.get(url, headers=self.headers).text
        url = 'http://course.ucas.ac.cn' + \
              BeautifulSoup(html, self.__BEAUTIFULSOUPPARSE).find('frame', title='mainFrame')['src']
        html = self.session.get(url, headers=self.headers).text
        url = BeautifulSoup(html, self.__BEAUTIFULSOUPPARSE).find('a', class_='icon-sakai-membership')['href']
        html = self.session.get(url, headers=self.headers).text
        url = BeautifulSoup(html, self.__BEAUTIFULSOUPPARSE).find('iframe')['src']
        html = self.session.get(url, headers=self.headers).text
        return html

    def _parse_course_list(self):
        # 获取课程的所有URL
        html = self._get_course_page()
        self.course_list = ['http://course.ucas.ac.cn/portal/site/' + x for x in
                            re.findall(r'http://course.ucas.ac.cn/portal/site/([\S]+)"', html)]

    def _get_all_resource_url(self):
        # 从课程的所有URL中获取对应的所有课件
        print('读取课件中......')
        base_url = 'http://course.ucas.ac.cn/access/content/group/'
        urls = [base_url + x.split('/')[-1] + '/' for x in self.course_list]
        list(map(self._get_resource_url, urls))

    def _get_resource_url(self, base_url, _path='', source_name=None):
        html = self.session.get(base_url, headers=self.headers).text
        tds = BeautifulSoup(html, self.__BEAUTIFULSOUPPARSE).find_all('td')
        if not source_name:
            source_name = BeautifulSoup(html, self.__BEAUTIFULSOUPPARSE).find('h2').text
        res = set()
        for td in tds:
            url = td.find('a')
            if not url: continue
            url = urllib.parse.unquote(url['href'])
            if url == '../': continue
            if 'Folder' in td.text:  # directory
                self._get_resource_url(base_url + url, _path + '/' + url, source_name)
            if url.startswith('http:__'):  # Fix can't download when given a web link. eg: 计算机算法分析与设计
                try:
                    res.add((self.session.get(base_url + url, headers=self.headers, timeout=self._time_out).url, _path))
                except requests.exceptions.ReadTimeout:
                    print("Error-----------: ", base_url + url, "添加进下载路径失败,服务器长时间无响应")
            else:
                res.add((base_url + url, _path))

        for url, _path in res:
            self.to_download.append((source_name, _path, url))

    def _start_download(self):
        # 多线程下载
        p = Pool()
        p.map(self._download_file, self.to_download)
        p.close()
        p.join()

    def _download_file(self, param):
        # 下载文件
        dic_name, sub_directory, url = param
        save_path = self.save_base_path + '/' + dic_name + '/' + sub_directory
        if self.lock.acquire():
            if not os.path.exists(save_path):  # To create directory
                os.makedirs(save_path)
            self.lock.release()

        filename = url.split('/')[-1]
        save_path += '/' + filename
        if not os.path.exists(save_path):  # To prevent download exists files
            try:
                r = self.session.get(url, stream=True, timeout=self._time_out)
            except requests.exceptions.ReadTimeout as e:
                print('Error-----------文件下载失败,服务器长时间无响应: ', save_path)
            size_mb = int(r.headers.get('Content-Length')) / (1024 ** 2)
            print('Start download {dic_name}  >> {sub_directory}{filename}  {size_mb:.2f}MB'.format(**locals()))
            with open(save_path, 'wb') as f:
                for chunk in r.iter_content(chunk_size=1024):
                    if chunk:  # filter out keep-alive new chunks
                        f.write(chunk)
                        f.flush()
            print('{dic_name}  >> {sub_directory}{filename}   Download success'.format(**locals()))

    def start(self):
        try:
            self._login_sep()
            self._parse_course_list()
            self._get_all_resource_url()
            self._start_download()
        except ValueError as e:
            print(e, '请检查private文件')


if __name__ == '__main__':
    start = datetime.now()
    s = Ucas()
    s.start()
    print('Task complete, total time:', datetime.now() - start)
    os.system("pause")

 

本博客若无特殊说明则由 hrwhisper 原创发布
转载请点名出处:细语呢喃 > UCAS自动同步课件脚本
本文地址:https://www.hrwhisper.me/ucas_course_ppt_auto_download/

打赏一杯咖啡钱呗

技术改变生活 , , . permalink.

9 thoughts on “UCAS自动同步课件脚本

  1. 不喜欢 PEP8 里面 protedted 和 private 的说法。。。double underscore is annoying。。。还是单下划线最好haha。。果然没人用 threading~~

Leave a Reply

Your email address will not be published. Required fields are marked *