downloader

一个简易的爬虫项目,用于下载网上的一些pdf资料。涉及到异步库,redis等知识。

asyncio

对asyncio只有一个基本认识,一个异步框架。主要包括Eventloop, Coroutine(协程), Future, Task四个基础概念。既然要做到异步,肯定涉及到事件循环调用,而Eventloop就相当与控制中心,负责循环调用注册在其中的协程。而coroutine(协程)是一种特殊的函数,特殊在于其可以交出调用权。Future是对Coroutine的再封装,而Task提供了一套接口方便开发者创建Future。协程可以通过yield实现交出调用权。
参考:
https://zhuanlan.zhihu.com/p/72887901
https://zhuanlan.zhihu.com/p/73568282
https://zhuanlan.zhihu.com/p/75193842

redis

redis是一个内存型数据库,为了业务需求提供了一些特定的数据结构,在一些业务场景使用广泛,这里主要是用redis的列表结构保存即将要访问的链接。在python中使用redis如下:

1
2
3
4
5
r = redis.StrictRedis(host='127.0.0.1', port=6379, db=2, decode_responses=True)
# 创建一名为url_list的列表,并添加item
r.lpush('url_list', item)
# pop 一项 item
r.rpop('url_list')

另外本文使用redis实现 bloom filter,参见:https://github.com/HatBoy/BloomFilter
关于 bloom filter的原理,一张图就足够清晰了

由bloom filter的原理可知,错误率由数组的长度和 hash 函数数量决定,但在设定的时候这些东西很间接,没什么概率,但我们大概需要存多少数据,要控制在什么样的错误率下这些是直接的,于是根据这些直接的值计算出数组长度和hash函数个数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#coding=UTF-8

import mmh3
import BitVector
import redis
import math
import time


class BloomFilter():
#内置100个随机种子
SEEDS = [543, 460, 171, 876, 796, 607, 650, 81, 837, 545, 591, 946, 846, 521, 913, 636, 878, 735, 414, 372,
344, 324, 223, 180, 327, 891, 798, 933, 493, 293, 836, 10, 6, 544, 924, 849, 438, 41, 862, 648, 338,
465, 562, 693, 979, 52, 763, 103, 387, 374, 349, 94, 384, 680, 574, 480, 307, 580, 71, 535, 300, 53,
481, 519, 644, 219, 686, 236, 424, 326, 244, 212, 909, 202, 951, 56, 812, 901, 926, 250, 507, 739, 371,
63, 584, 154, 7, 284, 617, 332, 472, 140, 605, 262, 355, 526, 647, 923, 199, 518]

#capacity是预先估计要去重的数量
#error_rate表示错误率
#conn表示redis的连接客户端
#key表示在redis中的键的名字前缀
def __init__(self, capacity=1000000000, error_rate=0.00000001, conn=None, key='BloomFilter'):
self.m = math.ceil(capacity*math.log2(math.e)*math.log2(1/error_rate)) #需要的总bit位数
self.k = math.ceil(math.log1p(2)*self.m/capacity) #需要最少的hash次数
self.mem = math.ceil(self.m/8/1024/1024) #需要的多少M内存
self.blocknum = math.ceil(self.mem/512) #需要多少个512M的内存块,value的第一个字符必须是ascii码,所有最多有256个内存块
self.seeds = self.SEEDS[0:self.k]
self.key = key
self.N = 2**31-1
self.redis = conn
if not self.redis:
#默认如果没有redis连接,在内存中使用512M的内存块去重
self.bitset = BitVector.BitVector(size=1<<32)
print(self.mem)
print(self.k)

def add(self, value):
name = self.key + "_" + str(ord(value[0])%self.blocknum)
hashs = self.get_hashs(value)
for hash in hashs:
if self.redis:
self.redis.setbit(name, hash, 1)
else:
self.bitset[hash] = 1

def is_exist(self, value):
name = self.key + "_" + str(ord(value[0])%self.blocknum)
hashs = self.get_hashs(value)
exist = True
for hash in hashs:
if self.redis:
exist = exist & self.redis.getbit(name, hash)
else:
exist = exist & self.bitset[hash]
return exist

def get_hashs(self, value):
hashs = list()
for seed in self.seeds:
hash = mmh3.hash(value, seed)
if hash >= 0:
hashs.append(hash)
else:
hashs.append(self.N - hash)
return hashs


pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=0)
conn = redis.StrictRedis(connection_pool=pool)

start = time.time()
bf = BloomFilter(conn=conn)
bf.add('test')
bf.add('fsest1')
print(bf.is_exist('qest'))
print(bf.is_exist('testdsad'))
end = time.time()
print(end-start)

downloader

以下创建一个简易爬虫,给定初始网址后该爬虫自动提取网页中的其他网址,并不断往下访问。如果链接是以pdf结尾则进行下载(如果要下载图片或其他资源原理相同)。这个版本代码没有考虑网页中的相对路径地址,以后有时间再写第二个版本吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import random

import aiohttp
import asyncio
from bs4 import BeautifulSoup as bs
import re
from urllib import parse
import os
import time
import requests
import redis
from hashlib import md5
import mmh3
from bitarray import bitarray

import BitVector
import redis
import math
import time

proxies = {'http': 'http://127.0.0.1:7890', 'https': 'http://127.0.0.1:7890'}


class BloomFilter():
#内置100个随机种子
SEEDS = [543, 460, 171, 876, 796, 607, 650, 81, 837, 545, 591, 946, 846, 521, 913, 636, 878, 735, 414, 372,
344, 324, 223, 180, 327, 891, 798, 933, 493, 293, 836, 10, 6, 544, 924, 849, 438, 41, 862, 648, 338,
465, 562, 693, 979, 52, 763, 103, 387, 374, 349, 94, 384, 680, 574, 480, 307, 580, 71, 535, 300, 53,
481, 519, 644, 219, 686, 236, 424, 326, 244, 212, 909, 202, 951, 56, 812, 901, 926, 250, 507, 739, 371,
63, 584, 154, 7, 284, 617, 332, 472, 140, 605, 262, 355, 526, 647, 923, 199, 518]

#capacity是预先估计要去重的数量
#error_rate表示错误率
#conn表示redis的连接客户端
#key表示在redis中的键的名字前缀
def __init__(self, capacity=1000000000, error_rate=0.00000001, conn=None, key='BloomFilter'):
self.m = math.ceil(capacity*math.log2(math.e)*math.log2(1/error_rate)) #需要的总bit位数
self.k = math.ceil(math.log1p(2)*self.m/capacity) #需要最少的hash次数
self.mem = math.ceil(self.m/8/1024/1024) #需要的多少M内存
self.blocknum = math.ceil(self.mem/512) #需要多少个512M的内存块,value的第一个字符必须是ascii码,所有最多有256个内存块
self.seeds = self.SEEDS[0:self.k]
self.key = key
self.N = 2**31-1
self.redis = conn
if not self.redis:
#默认如果没有redis连接,在内存中使用512M的内存块去重
self.bitset = BitVector.BitVector(size=1<<32)

def add(self, value):
name = self.key + "_" + str(ord(value[0])%self.blocknum)
hashs = self.get_hashs(value)
for hash in hashs:
if self.redis:
self.redis.setbit(name, hash, 1)
else:
self.bitset[hash] = 1

def is_exist(self, value):
name = self.key + "_" + str(ord(value[0])%self.blocknum)
hashs = self.get_hashs(value)
exist = True
for hash in hashs:
if self.redis:
exist = exist & self.redis.getbit(name, hash)
else:
exist = exist & self.bitset[hash]
return exist

def get_hashs(self, value):
hashs = list()
for seed in self.seeds:
hash = mmh3.hash(value, seed)
if hash >= 0:
hashs.append(hash)
else:
hashs.append(self.N - hash)
return hashs


class Downloader():
def __init__(self, start_url_list):
self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36'}
self.r = redis.StrictRedis(host='127.0.0.1', port=6379, db=2, decode_responses=True)
self.bf = BloomFilter(conn=self.r)
for item in start_url_list:
self.r.lpush('url_list', item)

# 根据url请求网址并返回html
async def get_html(self, url, header=None):
sem = asyncio.Semaphore(100) # 并发数量限制
try:
async with sem:
async with aiohttp.ClientSession(headers=header, cookies='') as session:
async with session.get(url, proxy='http://127.0.0.1:7890') as resp:
if resp.status in [200, 201]:
data = await resp.text()
return data
else:
return None
except:
return None

# 获取网页中的pdf链接
def getPDF(self, content, base_url):
pdf = r"href.*\.pdf\""
pdflist = re.findall(pdf, content)
fianl_url_list = []
# 进行相对路径--> 绝对路径转换
for item in pdflist:
item_url = item[6:-1]
new_full_url = parse.urljoin(base_url, item_url)
fianl_url_list.append(new_full_url)
return fianl_url_list

# 根据pdf链接下载
def downloadPDF(self, item):
filename = os.path.basename(parse.unquote(item))
cop = re.compile("[^\u4e00-\u9fa5^a-z^A-Z^0-9^\.]") # 匹配不是中文、大小写、数字的其他字符
filename = cop.sub('', filename) # 将string1中匹配到的字符替换成空字符
salt = ''.join(["{}".format(random.randint(0, 9)) for num in range(0, 5)])
filename = salt + filename
try:
responsepdf = requests.get(item, proxies=proxies)
if responsepdf.status_code == 200:
with open(r"E:/pdf/%s" % filename, "wb") as code:
code.write(responsepdf.content)
time.sleep(1) # 防止访问速度过快,可以灵活的调整时间
except:
return

# 获取网页中的链接
def getAllhref(self, html, base_url):
# 使用BeautifulSoup函数解析传入的html
soup = bs(html, features="lxml")
allnode_of_a = soup.find_all("a")
result = [_.get("href").strip() for _ in allnode_of_a if _ is not None and _.get("href") is not None]
# 有些是相对路径,需要转换为绝对路径
filterResult = []
for item in result:
if len(item) > 0:
if item.startswith("http"):
filterResult.append(item)
else:
new_full_url = parse.urljoin(base_url, item)
filterResult.append(new_full_url)
return filterResult

# 传入一个 redis list,开始从里面取出一个url并请求数据
async def start_spider(self):
while True:
cur_url = self.r.rpop('url_list')
if cur_url is None:
break
if self.bf.is_exist(cur_url):
continue
response = await self.get_html(url=cur_url, header=self.headers)
# 将访问过的url加入到bloom filter
self.bf.add(cur_url)
if response is not None:
pdflist = self.getPDF(response, cur_url)
for item in pdflist:
self.downloadPDF(item)
nextList = self.getAllhref(response, cur_url)
for url in nextList:
self.r.lpush('url_list', url)



async def main():
start_url_list = ["https://github.com/Kensuke-Hinata/statistic/tree/master/os/books"]
downloader = Downloader(start_url_list)
await downloader.start_spider()


if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())