0%

UCAS自动同步课件脚本

Cool Idea

在UCAS进入研一的生活接近两个礼拜,发现老师的课件基本上都上传到课程网站,然后是一节一节给的。这样,每次老师更新课件的时候,我们都得开一下网页,然后看看是不是更新了?(虽然有邮箱提醒,但是这种邮件我都直接忽略的),于是萌生了写个脚本来自动下载的想法。

创作过程及难点

首先是进行抓包分析。登录SEP很简单,直接发送了用户密码还有sb=sb参数。

进入SEP后,该系统采用一个特殊的key传递到另个一个系统(比如选课,课程网站等),捕获这个key之后,就可以进入课程网站啦。

然后分别分析每门课的课件的URL。需要注意的是:

  • 有的老师是给出HTML链接,需要进行重定向,而且需要解决给的链接挂掉的情况。
  • 文件夹需要递归下载。(不能单纯的用'.'来判断,有的老师就是没有扩展名的,不知道上传的是什么鬼)
  • 在下载模块中,使用的是多线程,在多线程中,需要注意创建文件夹路径不存在时进行创建需要加锁。
  • 对于已经存在的文件,这不要进行下载了
  • 验证码的识别(2017.2更新,多了验证码)

  • 进入第二学期后,只查看第二学期的课程(学期的概念)

此外,还用PyInstaller打包成了exe文件。

Code

代码如下:完整代码以及后续更新请见github: UCAS 课件自动下载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# -*- coding: utf-8 -*-
# @Date : 2016/9/9
# @Author : hrwhisper
import codecs
import re
import os
import multiprocessing
from multiprocessing.dummy import Pool
from datetime import datetime
import urllib.parse
import requests
from bs4 import BeautifulSoup

def read_file():
with codecs.open(r'./private', "r", "utf-8") as f:
res = f.read().split("\n")
username, password, save_path = res
return username, password, save_path

class Ucas(object):
username, password, save_base_path = read_file()

def __init__(self, time_out=10):
self.__BEAUTIFULSOUPPARSE = 'html5lib' # or use 'lxml'
self.session = requests.session()
self.headers = {
"Host": "sep.ucas.ac.cn",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate, sdch",
"Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.6,en;q=0.4",
}
self.course_list = []
self.to_download = []
self.lock = multiprocessing.Lock()
self._time_out = time_out

def _login_sep(self):
# 登录sep
print('Login....')
url = "http://sep.ucas.ac.cn/slogin"
post_data = {
"userName": self.username,
"pwd": self.password,
"sb": "sb"
}
html = self.session.post(url, data=post_data, headers=self.headers).text
result = BeautifulSoup(html, self.__BEAUTIFULSOUPPARSE).find('div', class_='alert alert-error')
if result: raise ValueError('用户名或者密码错误')

def _get_course_page(self):
# 从sep中获取Identity Key来登录课程系统,并获取课程信息
url = "http://sep.ucas.ac.cn/portal/site/16/801"
r = self.session.get(url, headers=self.headers)
code = re.findall(r'"http://course.ucas.ac.cn/portal/plogin\?Identity=(.*)"', r.text)[0]

url = "http://course.ucas.ac.cn/portal/plogin?Identity=" + code
self.headers['Host'] = "course.ucas.ac.cn"
html = self.session.get(url, headers=self.headers).text
url = 'http://course.ucas.ac.cn' + \
BeautifulSoup(html, self.__BEAUTIFULSOUPPARSE).find('frame', title='mainFrame')['src']
html = self.session.get(url, headers=self.headers).text
url = BeautifulSoup(html, self.__BEAUTIFULSOUPPARSE).find('a', class_='icon-sakai-membership')['href']
html = self.session.get(url, headers=self.headers).text
url = BeautifulSoup(html, self.__BEAUTIFULSOUPPARSE).find('iframe')['src']
html = self.session.get(url, headers=self.headers).text
return html

def _parse_course_list(self):
# 获取课程的所有URL
html = self._get_course_page()
self.course_list = ['http://course.ucas.ac.cn/portal/site/' + x for x in
re.findall(r'http://course.ucas.ac.cn/portal/site/([\S]+)"', html)]

def _get_all_resource_url(self):
# 从课程的所有URL中获取对应的所有课件
print('读取课件中......')
base_url = 'http://course.ucas.ac.cn/access/content/group/'
urls = [base_url + x.split('/')[-1] + '/' for x in self.course_list]
list(map(self._get_resource_url, urls))

def _get_resource_url(self, base_url, _path='', source_name=None):
html = self.session.get(base_url, headers=self.headers).text
tds = BeautifulSoup(html, self.__BEAUTIFULSOUPPARSE).find_all('td')
if not source_name:
source_name = BeautifulSoup(html, self.__BEAUTIFULSOUPPARSE).find('h2').text
res = set()
for td in tds:
url = td.find('a')
if not url: continue
url = urllib.parse.unquote(url['href'])
if url == '../': continue
if 'Folder' in td.text: # directory
self._get_resource_url(base_url + url, _path + '/' + url, source_name)
if url.startswith('http:__'): # Fix can't download when given a web link. eg: 计算机算法分析与设计
try:
res.add((self.session.get(base_url + url, headers=self.headers, timeout=self._time_out).url, _path))
except requests.exceptions.ReadTimeout:
print("Error-----------: ", base_url + url, "添加进下载路径失败,服务器长时间无响应")
else:
res.add((base_url + url, _path))

for url, _path in res:
self.to_download.append((source_name, _path, url))

def _start_download(self):
# 多线程下载
p = Pool()
p.map(self._download_file, self.to_download)
p.close()
p.join()

def _download_file(self, param):
# 下载文件
dic_name, sub_directory, url = param
save_path = self.save_base_path + '/' + dic_name + '/' + sub_directory
if self.lock.acquire():
if not os.path.exists(save_path): # To create directory
os.makedirs(save_path)
self.lock.release()

filename = url.split('/')[-1]
save_path += '/' + filename
if not os.path.exists(save_path): # To prevent download exists files
try:
r = self.session.get(url, stream=True, timeout=self._time_out)
except requests.exceptions.ReadTimeout as e:
print('Error-----------文件下载失败,服务器长时间无响应: ', save_path)
size_mb = int(r.headers.get('Content-Length')) / (1024 ** 2)
print('Start download {dic_name} >> {sub_directory}{filename} {size_mb:.2f}MB'.format(**locals()))
with open(save_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
f.flush()
print('{dic_name} >> {sub_directory}{filename} Download success'.format(**locals()))

def start(self):
try:
self._login_sep()
self._parse_course_list()
self._get_all_resource_url()
self._start_download()
except ValueError as e:
print(e, '请检查private文件')

if __name__ == '__main__':
start = datetime.now()
s = Ucas()
s.start()
print('Task complete, total time:', datetime.now() - start)
os.system("pause")
请我喝杯咖啡吧~