您现在的位置是:首页 > 学无止境
如何构建一个分布式爬虫
http://python.jobbole.com/87823/
http://python.jobbole.com/87826/
http://python.jobbole.com/87833/
理论篇
前言
本系列文章计划分三个章节进行讲述,分别是理论篇、...
转载自:
http://python.jobbole.com/87823/
http://python.jobbole.com/87826/
http://python.jobbole.com/87833/
理论篇
前言
本系列文章计划分三个章节进行讲述,分别是理论篇、基础篇和实战篇。理论篇主要为构建分布式爬虫而储备的理论知识,基础篇会基于理论篇的知识写一个简易的分布式爬虫,实战篇则会以微博为例,教大家做一个比较完整且足够健壮的分布式微博爬虫。通过这三篇文章,希望大家能掌握如何构建一个分布式爬虫的方法;能举一反三,将celery
用于除爬虫外的其它场景。目前基本上的博客都是教大家使用scrapyd或者scrapy-redis构建分布式爬虫,本系列文章会从另外一个角度讲述如何用requests+celery构建一个健壮的、可伸缩并且可扩展的分布式爬虫。
本系列文章属于爬虫进阶文章,期望受众是具有一定Python基础知识和编程能力、有爬虫经验并且希望提升自己的同学。小白要是感兴趣,也可以看看,看不懂的话,可以等有了一定基础和经验后回过头来再看。
另外一点说明,本系列文章不是旨在构建一个分布式爬虫框架或者分布式任务调度框架,而是利用现有的分布式任务调度工具来实现分布式爬虫,所以请轻喷。
分布式爬虫概览
- 何谓分布式爬虫?
通俗的讲,分布式爬虫就是多台机器多个 spider 对多个 url 的同时处理问题,分布式的方式可以极大提高程序的抓取效率。 - 构建分布式爬虫通畅需要考虑的问题
(1)如何能保证多台机器同时抓取同一个URL?
(2)如果某个节点挂掉,会不会影响其它节点,任务如何继续?
(3)既然是分布式,如何保证架构的可伸缩性和可扩展性?不同优先级的抓取任务如何进行资源分配和调度?
基于上述问题,我选择使用celery作为分布式任务调度工具,是分布式爬虫中任务和资源调度的核心模块。它会把所有任务都通过消息队列发送给各个分布式节点进行执行,所以可以很好的保证url不会被重复抓取;它在检测到worker挂掉的情况下,会尝试向其他的worker重新发送这个任务信息,这样第二个问题也可以得到解决;celery自带任务路由,我们可以根据实际情况在不同的节点上运行不同的抓取任务(在实战篇我会讲到)。本文主要就是带大家了解一下celery的方方面面(有celery相关经验的同学和大牛可以直接跳过了)
Celery知识储备
celery基础讲解
按celery官网的介绍来说
Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。它是一个专注于实时处理的任务队列,同时也支持任务调度。
下面几个关于celery的核心知识点
- broker:翻译过来叫做中间人。它是一个消息传输的中间件,可以理解为一个邮箱。每当应用程序调用celery的异步任务的时候,会向broker传递消息,而后celery的worker将会取到消息,执行相应程序。这其实就是消费者和生产者之间的桥梁。
- backend: 通常程序发送的消息,发完就完了,可能都不知道对方时候接受了。为此,celery实现了一个backend,用于存储这些消息以及celery执行的一些消息和结果。
- worker: Celery类的实例,作用就是执行各种任务。注意在celery3.1.25后windows是不支持celery worker的!
- producer: 发送任务,将其传递给broker
- beat: celery实现的定时任务。可以将其理解为一个producer,因为它也是通过网络调用定时将任务发送给worker执行。注意在windows上celery是不支持定时任务的!
下面是关于celery的架构示意图,结合上面文字的话应该会更好理解
由于celery只是任务队列,而不是真正意义上的消息队列,它自身不具有存储数据的功能,所以broker和backend需要通过第三方工具来存储信息,celery官方推荐的是 RabbitMQ和Redis,另外mongodb等也可以作为broker或者backend,可能不会很稳定,我们这里选择Redis作为broker兼backend。
关于redis的安装和配置可以查看这里
实际例子
先安装celery
1
|
pip
install celery
|
我们以官网给出的例子来做说明,并对其进行扩展。首先在项目根目录下,这里我新建一个项目叫做celerystudy
,然后切换到该项目目录下,新建文件tasks.py
,然后在其中输入下面代码
1
2
3
4
5
6
7
|
from
celery import
Celery
app
= Celery('tasks',
broker='redis://:''@223.129.0.190:6379/2',
backend='redis://:''@223.129.0.190:6379/3')
@app.task
def add(x,
y):
return
x +
y
|
这里我详细讲一下代码:我们先通过app=Celery()
来实例化一个celery对象,在这个过程中,我们指定了它的broker,是redis的db 2,也指定了它的backend,是redis的db3, broker和backend的连接形式大概是这样
1
|
redis://:password@hostname:port/db_number
|
然后定义了一个add
函数,重点是@app.task
,它的作用在我看来就是将add()
注册为一个类似服务的东西,本来只能通过本地调用的函数被它装饰后,就可以通过网络来调用。这个tasks.py
中的app就是一个worker。它可以有很多任务,比如这里的任务函数add
。我们再通过在命令行切换到项目根目录,执行
1
|
celery
-A
tasks worker
-l
info
|
启动成功后就是下图所示的样子
这里我说一下各个参数的意思,-A
指定的是app(即Celery实例)所在的文件模块,我们的app是放在tasks.py
中,所以这里是
tasks
;worker表示当前以worke
r的方式运行,难道还有别的方式?对的,比如运行定时任务就不用指定worker
这个关键字;
-l info
表示该worker节点的日志等级是info
,更多关于启动worker的参数(比如-c
、-Q
等常用的)请使用
1
|
celery
worker --help
|
进行查看
将worker启动起来后,我们就可以通过网络来调用add
函数了。我们在后面的分布式爬虫构建中也是采用这种方式分发和消费url的。在命令行先切换到项目根目录,然后打开python交互端
1
2
|
from
tasks import
add
rs =
add.delay(2,
2)
# 这里的add.delay就是通过网络调用将任务发送给add所在的worker执行
|
这个时候我们可以在worker的界面看到接收的任务和计算的结果。
1
2
3
|
[2017-05-19
14:22:43,038:
INFO/MainProcess]
Received task:
tasks.add[c0dfcd0b-d05f-4285-b944-0a8aba3e7e61]
# worker接收的任务
[2017-05-19
14:22:43,065:
INFO/MainProcess]
Task tasks.add[c0dfcd0b-d05f-4285-b944-0a8aba3e7e61]
succeeded in
0.025274309000451467s:
4 # 执行结果
|
这里是异步调用,如果我们需要返回的结果,那么要等rs
的ready
状态true
才行。这里add
看不出效果,不过试想一下,如果我们是调用的比较占时间的io任务,那么异步任务就比较有价值了
1
2
3
4
5
6
7
8
9
10
11
12
13
|
rs
# <AsyncResult: c0dfcd0b-d05f-4285-b944-0a8aba3e7e61>
rs.ready()
# true 表示已经返回结果了
rs.status
# 'SUCCESS' 任务执行状态,失败还是成功
rs.successful()
# True 表示执行成功
rs.result
# 4 返回的结果
rs.get()
# 4 返回的结果
<celery.backends.redis.RedisBackend
object at
0x30033ec>
#这里我们backend 结果存储在redis里
|
上面讲的是从Python交互终端中调用add
函数,如果我们要从另外一个py文件调用呢?除了通过import
然后add.delay()
这种方式,我们还可以通过send_task()
这种方式,我们在项目根目录另外新建一个py文件叫做
excute_tasks.py
,在其中写下如下的代码
1
2
3
4
|
from
tasks import
add
if
__name__ ==
'__main__':
add.delay(5,
10)
|
这时候可以在celery的worker界面看到执行的结果
1
2
|
[2017-05-19
14:25:48,039:
INFO/MainProcess]
Received task:
tasks.add[f5ed0d5e-a337-45a2-a6b3-38a58efd9760]
[2017-05-19
14:25:48,074:
INFO/MainProcess]
Task tasks.add[f5ed0d5e-a337-45a2-a6b3-38a58efd9760]
succeeded in
0.03369094600020617s:
15
|
此外,我们还可以通过send_task()
来调用,将excute_tasks.py
改成这样
1
2
3
|
from
tasks import
app
if __name__
== '__main__':
app.send_task('tasks.add',
args=(10,
15),)
|
这种方式也是可以的。send_task()
还可能接收到为注册(即通过@app.task
装饰)的任务,这个时候worker会忽略这个消息
[2017-05-19 14:34:15,352: ERROR/MainProcess] Received unregistered task of type ‘tasks.adds’.
The message has been ignored and discarded.
定时任务
上面部分讲了怎么启动worker和调用worker的相关函数,这里再讲一下celery的定时任务。
爬虫由于其特殊性,可能需要定时做增量抓取,也可能需要定时做模拟登陆,以防止cookie过期,而celery恰恰就实现了定时任务的功能。在上述基础上,我们将tasks.py
文件改成如下内容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
from
celery import
Celery
app =
Celery('add_tasks',
broker='redis:''//223.129.0.190:6379/2',
backend='redis:''//223.129.0.190:6379/3')
app.conf.update(
# 配置所在时区
CELERY_TIMEZONE='Asia/Shanghai',
CELERY_ENABLE_UTC=True,
# 官网推荐消息序列化方式为json
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
# 配置定时任务
CELERYBEAT_SCHEDULE={
'my_task':
{
'task':
'tasks.add', # tasks.py模块下的add方法
'schedule':
60, # 每隔60运行一次
'args':
(23,
12),
}
}
)
@app.task
def
add(x,
y):
return
x +
y
|
然后先通过ctrl+c
停掉前一个worker,因为我们代码改了,需要重启worker才会生效。我们再次以celery -A tasks worker -l info
这个命令开启worker。
这个时候我们只是开启了worker,如果要让worker执行任务,那么还需要通过beat给它定时发送,我们再开一个命令行,切换到项目根目录,通过
1
|
celery
beat -A
tasks -l
info
|
1
2
3
4
5
6
7
8
9
10
|
celery
beat v3.1.25
(Cipater)
is starting.
__ - ...
__ - _
Configuration
->
.
broker ->
redis://223.129.0.190:6379/2
.
loader ->
celery.loaders.app.AppLoader
.
scheduler ->
celery.beat.PersistentScheduler
.
db ->
celerybeat-schedule
.
logfile ->
[stderr]@%INFO
.
maxinterval ->
now (0s)
[2017-05-19
15:56:57,125:
INFO/MainProcess]
beat:
Starting...
|
这样就表示定时任务已经开始运行了。
眼尖的同学可能看到我这里celery的版本是3.1.25
,这是因为celery支持的windows
最高版本是3.1.25。由于我的分布式微博爬虫的worker也同时部署在了windows上,所以我选择了使用
3.1.25
。如果全是linux系统,建议使用celery4。
此外,还有一点需要注意,在celery4后,定时任务(通过schedule调度的会这样,通过crontab调度的会马上执行)会在当前时间再过定时间隔执行第一次任务,比如我这里设置的是60秒的间隔,那么第一次执行add
会在我们通过celery beat -A tasks -l info
启动定时任务后60秒才执行;celery3.1.25则会马上执行该任务。
关于定时任务更详细的请看官方文档celery定时任务
至此,我们把构建一个分布式爬虫的理论知识都讲了一遍,主要就是对于celery
的了解和使用,这里并未涉及到celery的一些高级特性,实战篇可能会讲解一些我自己使用的特性。
下一篇我将介绍如何使用celery写一个简单的分布式爬虫,希望大家能有所收获。
此外,打一个广告,我写了一个分布式的微博爬虫,主要就是利用celery做的分布式实战篇我也将会以该项目其中一个模块进行讲解,有兴趣的可以点击看看,也欢迎有需求的朋友试用。
基础篇
继上篇我们谈论了Celery的基本知识后,本篇继续讲解如何一步步使用Celery构建分布式爬虫。这次我们抓取的对象定为celery官方文档。
首先,我们新建目录distributedspider
,然后再在其中新建文件workers.py
,里面内容如下
1
2
3
4
5
6
7
8
9
10
|
from
celery import
Celery
app =
Celery('crawl_task',
include=['tasks'],
broker='redis://223.129.0.190:6379/1',
backend='redis://223.129.0.190:6379/2')
# 官方推荐使用json作为消息序列化方式
app.conf.update(
CELERY_TIMEZONE='Asia/Shanghai',
CELERY_ENABLE_UTC=True,
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
)
|
上述代码主要是做Celery实例的初始化工作,include
是在初始化celery app的时候需要引入的内容,主要就是注册为网络调用的函数所在的文件。然后我们再编写任务函数,新建文件tasks.py
,内容如下
1
2
3
4
5
6
7
8
9
|
import
requests
from bs4
import BeautifulSoup
from
workers import
app
@app.task
def
crawl(url):
print('正在抓取链接{}'.format(url))
resp_text
= requests.get(url).text
soup
= BeautifulSoup(resp_text,
'html.parser')
return
soup.find('h1').text
|
它的作用很简单,就是抓取指定的url,并且把标签为h1
的元素提取出来
最后,我们新建文件task_dispatcher.py
,内容如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
from
workers import
app
url_list =
[
'http://docs.celeryproject.org/en/latest/getting-started/introduction.html',
'http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html',
'http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html',
'http://docs.celeryproject.org/en/latest/getting-started/next-steps.html',
'http://docs.celeryproject.org/en/latest/getting-started/resources.html',
'http://docs.celeryproject.org/en/latest/userguide/application.html',
'http://docs.celeryproject.org/en/latest/userguide/tasks.html',
'http://docs.celeryproject.org/en/latest/userguide/canvas.html',
'http://docs.celeryproject.org/en/latest/userguide/workers.html',
'http://docs.celeryproject.org/en/latest/userguide/daemonizing.html',
'http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html'
]
def
manage_crawl_task(urls):
for
url in
urls:
app.send_task('tasks.crawl',
args=(url,))
if __name__
== '__main__':
manage_crawl_task(url_list)
|
这段代码的作用主要就是给worker发送任务,任务是tasks.crawl
,参数是url
(元祖的形式)
现在,让我们在节点A(hostname为resolvewang的主机)上启动worker
1
|
celery
-A
workers worker
-c
2 -l
info
|
这里 -c
指定了线程数为2, -l
表示日志等级是info
。我们把代码拷贝到节点B(节点名为wpm的主机),同样以相同命令启动worker,便可以看到以下输出
可以看到左边节点(A)先是all alone
,表示只有一个节点;后来再节点B启动后,它便和B同步了
1
|
sync
with celery@wpm
|
这个时候,我们运行给这两个worker节点发送抓取任务
1
|
python
task_dispatcher.py
|
可以看到如下输出
可以看到两个节点都在执行抓取任务,并且它们的任务不会重复。我们再在redis里看看结果
可以看到一共有11条结果,说明 tasks.crawl
中返回的数据都在db2(backend)中了,并且以json的形式存储了起来,除了返回的结果,还有执行是否成功等信息。
到此,我们就实现了一个很基础的分布式网络爬虫,但是它还不具有很好的扩展性,而且貌似太简单了…下一篇我将以微博数据采集为例来演示如何构建一个稳健的分布式网络爬虫。
实战篇
本篇文章将是『如何构建一个分布式爬虫』系列文章的最后一篇,拟从实战角度来介绍如何构建一个稳健的分布式微博爬虫。这里我没敢谈高效,抓过微博数据的同学应该都知道微博的反爬虫能力,也知道微博数据抓取的瓶颈在哪里。我在知乎上看过一些同学的说法,把微博的数据抓取难度简单化了,我只能说,那是你太naive,没深入了解和长期抓取而已。
本文将会以PC端微博进行讲解,因为移动端微博数据不如PC短全面,而且抓取和解析难度都会小一些。文章比较长,由于篇幅所限,文章并没有列出所有代码,只是讲了大致流程和思路。
要抓微博数据,第一步便是模拟登陆,因为很多信息(比如用户信息,用户主页微博数据翻页等各种翻页)都需要在登录状态下才能查看。关于模拟登陆进阶,我写过两篇文章,一篇是模拟登陆微博的,是从小白的角度写的。另外一篇是模拟登陆百度云的,是从有一定经验的熟手的角度写的。读了这两篇文章,并且根据我写的过程自己动手实现过的同学,应该对于模拟登陆PC端微博是没有太大难度的。那两篇文章没有讲如何处理验证码,这里我简单说一下,做爬虫的同学不要老想着用什么机器学习的方法去识别复杂验证码,真的难度非常大,这应该也不是一个爬虫工程师的工作重点,当然这只是我的个人建议。工程化的项目,我还是建议大家通过打码平台来解决验证码的问题。我在分布式微博爬虫中就是直接调用打码平台的接口来做的大规模微博账号的模拟登陆,效果还不错,而且打码成本很低。
说完模拟登陆(具体请参见我写的那两篇文章,篇幅所限,我就不copy过来了),我们现在正式进入微博的数据抓取。这里我会以微博用户信息抓取为例来进行分析和讲解。
关于用户信息抓取,可能我们有两个目的。一个是我们只想抓一些指定用户,另外一个是我们想尽可能多的抓取更多数量的用户的信息。我的目的假定是第二种。那么我们该以什么样的策略来抓取,才能获得尽可能多的用户信息呢?如果我们初始用户选择有误,选了一些不活跃的用户,很可能会形成一个环,这样就抓不了太多的数据。这里有一个很简单的思路:我们把一些大V拿来做为种子用户,我们先抓他们的个人信息,然后再抓大V所关注的用户和粉丝,大V关注的用户肯定也是类似大V的用户,这样的话,就不容易形成环了。
策略我们都清楚了。就该是分析和编码了。
我们先来分析如何构造用户信息的URL。这里我以微博名为一起神吐槽
的博主为例进行分析。做爬虫的话,一个很重要的意识就是爬虫能抓的数据都是人能看到的数据,反过来,人能在浏览器上看到的数据,爬虫几乎都能抓。这里用的是几乎
,因为有的数据抓取难度特别。我们首先需要以正常人的流程看看怎么获取到用户的信息。我们先进入该博主的主页,如下图
点击查看更多,可以查看到该博主的具体信息
这里我们就看到了他的具体信息了。然后,我们看该页面的url构造
我直接copy的地址栏的url。这样做有啥不好的呢?对于老鸟来说,一下就看出来了,这样做的话,可能会导致信息不全,因为可能有些信息是动态加载的。所以,我们需要通过抓包来判断到底微博会通过该url返回所有信息,还是需要请求一些ajax 链接才会返回一些关键信息。这里我就重复一下我的观点:抓包很重要,抓包很重要,抓包很重要!重要的事情说三遍。关于抓包,我在模拟登陆微博和模拟登陆百度云都详细讲过了,这里我就不讲了。
我们抓完包,发现并没有ajax请求。那么可以肯定请求前面的url,会返回所有信息。我们通过点击鼠标右键,查看网页源代码,然后ctrl+a
、ctrl+c
将所有的页面源码保存到本地,这里我命名为personinfo.html
。我们用浏览器打开该文件,发现我们需要的所有信息都在这段源码中,这个工作和抓包判断数据是否全面有些重复,但是在我看来是必不可少的,因为我们解析页面数据的时候还可以用到这个html文件,如果我们每次都通过网络请求去解析内容的话,那么可能账号没一会儿就会被封了(因为频繁访问微博信息),所以我们需要把要解析的文件保存到本地。
从上面分析中我们可以得知
这个url就是获取用户数据的url。那么我们在只知道用户id的时候怎么构造它呢?我们可以多拿几个用户id来做测试,看构造是否有规律,比如我这里以用户名为网易云音乐的用户做分析,发现它的用户信息页面构造如下
这个就和上面那个不同了。但是我们仔细观察,可以发现上面那个是个人用户,下面是企业微博用户。我们尝试一下把它们url格式都统一为第一种或者第二种的格式
这样会出现404,那么统一成上面那种呢?
这样子的话,它会被重定向到用户主页,而不是用户详细资料页。所以也就不对了。那么该以什么依据判断何时用第一种url格式,何时用第二种url格式呢?我们多翻几个用户,会发现除了100505
之外,还有100305
、100206
等前缀,那么我猜想这个应该可以区分不同用户。这个前缀在哪里可以得到呢?我们打开我们刚保存的页面源码,搜索100505
,可以发现
微博应该是根据这个来区分不同用户类型的。这里大家可以自己也可以试试,看不同用户的domain
是否不同。为了数据能全面,我也是做了大量测试,发现个人用户的domain是1005051
,作家是100305
,其他基本都是认证的企业号。前两个个人信息的url构造就是
后者的是
弄清楚了个人信息url的构造方式,但是还有一个问题。我们已知只有uid啊,没有domain啊。如果是企业号,我们通过domain=100505
会被重定向到主页,如果是作家等(domain=100305或者100306),也会被重定向主页。我们在主页把domain提取出来,再请求一次,不就能拿到用户详细信息了吗?
关于如何构造获取用户信息的url的相关分析就到这里了。因为我们是在登录的情况下进行数据抓取的,可能在抓取的时候,某个账号突然就被封了,或者由于网络原因,某次请求失败了,该如何处理?对于前者,我们需要判断每次请求返回的内容是否符合预期,也就是看response url是否正常,看response content是否是404或者让你验证手机号等,对于后者,我们可以做一个简单的重试策略,大概代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
@timeout_decorator
def get_page(url,
user_verify=True,
need_login=True):
"""
:param url: 待抓取url
:param user_verify: 是否为可能出现验证码的页面(ajax连接不会出现验证码,如果是请求微博或者用户信息可能出现验证码),否为抓取转发的ajax连接
:param need_login: 抓取页面是否需要登录,这样做可以减小一些账号的压力
:return: 返回请求的数据,如果出现404或者403,或者是别的异常,都返回空字符串
"""
crawler.info('本次抓取的url为{url}'.format(url=url))
count
= 0
while
count <
max_retries:
if
need_login:
# 每次重试的时候都换cookies,并且和上次不同,如果只有一个账号,那么就允许相同
name_cookies
= Cookies.fetch_cookies()
if
name_cookies is
None:
crawler.warning('cookie池中不存在cookie,正在检查是否有可用账号')
rs
= get_login_info()
# 选择状态正常的账号进行登录,账号都不可用就停掉celery worker
if
len(rs)
== 0:
crawler.error('账号均不可用,请检查账号健康状况')
# 杀死所有关于celery的进程
if
'win32' in
sys.platform:
os.popen('taskkill
/F /IM "celery*"')
else:
os.popen('pkill
-f "celery"')
else:
crawler.info('重新获取cookie中...')
login.excute_login_task()
time.sleep(10)
try:
if
need_login:
resp
= requests.get(url,
headers=headers,
cookies=name_cookies[1],
timeout=time_out,
verify=False)
if
"$CONFIG['islogin'] = '0'"
in resp.text:
crawler.warning('账号{}出现异常'.format(name_cookies[0]))
freeze_account(name_cookies[0],
0)
Cookies.delete_cookies(name_cookies[0])
continue
else:
resp
= requests.get(url,
headers=headers,
timeout=time_out,
verify=False)
page
= resp.text
if
page:
page
= page.encode('utf-8',
'ignore').decode('utf-8')
else:
continue
# 每次抓取过后程序sleep的时间,降低封号危险
time.sleep(interal)
if
user_verify:
if
'unfreeze' in
resp.url
or 'accessdeny'
in resp.url
or 'userblock'
in resp.url
or is_403(page):
crawler.warning('账号{}已经被冻结'.format(name_cookies[0]))
freeze_account(name_cookies[0],
0)
Cookies.delete_cookies(name_cookies[0])
count
+= 1
continue
if
'verifybmobile'
in resp.url:
crawler.warning('账号{}功能被锁定,需要手机解锁'.format(name_cookies[0]))
freeze_account(name_cookies[0],
-1)
Cookies.delete_cookies(name_cookies[0])
continue
if
not is_complete(page):
count
+= 1
continue
if
is_404(page):
crawler.warning('url为{url}的连接不存在'.format(url=url))
return
''
except
(requests.exceptions.ReadTimeout,
requests.exceptions.ConnectionError,
AttributeError)
as e:
crawler.warning('抓取{}出现异常,具体信息是{}'.format(url,
e))
count
+= 1
time.sleep(excp_interal)
else:
Urls.store_crawl_url(url,
1)
return
page
crawler.warning('抓取{}已达到最大重试次数,请在redis的失败队列中查看该url并检查原因'.format(url))
Urls.store_crawl_url(url,
0)
return
''
|
这里大家把上述代码当一段伪代码读就行了,主要看看如何处理抓取时候的异常。因为如果贴整个用户抓取的代码,不是很现实,代码量有点大。
下面讲页面解析的分析。有一些做PC端微博信息抓取的同学,可能曾经遇到过这么个问题:保存到本地的html文件打开都能看到所有信息啊,为啥在页面源码中找不到呢?因为PC端微博页面的关键信息都是像下图这样,被FM.view()
包裹起来的,里面的数据可能被json encode过。
那么这么多的FM.view()
,我们怎么知道该提取哪个呢?这里有一个小技巧,由于只有中文会被编码,英文还是原来的样子,所以我们可以看哪段script中包含了渲染后的页面中的字符,那么那段应该就可能包含所有页面信息。我们这里以顶部的头像为例,如图
我们在页面源码中搜索,只发现一个script中有该字符串,那么就是那段script是页面相关信息。我们可以通过正则表达式把该script提取出来,然后把其中的html
也提取出来,再保存到本地,看看信息是否全面。这里我就不截图了。感觉还有很多要写的,不然篇幅太长了。
另外,对于具体页面的解析,我也不做太多的介绍了。太细的东西还是建议读读源码。我只讲一下,我觉得的一种处理异常的比较优雅的方式。微博爬虫的话,主要是页面样式太多,如果你打算包含所有不同的用户的模版,那么我觉得几乎不可能,不同用户模版,用到的解析规则就不一样。那么出现解析异常如何处理?尤其是你没有catch到的异常。很可能因为这个问题,程序就崩掉。其实对于Python这门语言来说,我们可以通过 装饰器 来捕捉我们没有考虑到的异常,比如我这个装饰器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
def
parse_decorator(return_type):
"""
:param return_type: 用于捕捉页面解析的异常, 0表示返回数字0, 1表示返回空字符串, 2表示返回[],3表示返回False, 4表示返回{}, 5返回None
:return: 0,'',[],False,{},None
"""
def
page_parse(func):
@wraps(func)
def
handle_error(*keys):
try:
return
func(*keys)
except
Exception as
e:
parser.error(e)
if
return_type ==
5:
return
None
elif
return_type ==
4:
return
{}
elif
return_type ==
3:
return
False
elif
return_type ==
2:
return
[]
elif
return_type ==
1:
return
''
else:
return
0
return
handle_error
return
page_parse
|
上面的代码就是处理解析页面发生异常的情况,我们只能在数据的准确性、全面性和程序的健壮性之间做一些取舍。用装饰器的话,程序中不用写太多的 try
语句,代码重复率也会减少很多。
页面的解析由于篇幅所限,我就讲到这里了。没有涉及太具体的解析,其中一个还有一个比较难的点,就是数据的全面性,读者可以去多观察几个微博用户的个人信息,就会发现有的个人信息,有的用户有填写,有的并没有。解析的时候要考虑完的话,建议从自己的微博的个人信息入手,看到底有哪些可以填。这样可以保证几乎不会漏掉一些重要的信息。
最后,我再切合本文的标题,讲如何搭建一个分布式的微博爬虫。开发过程中,我们可以先就做单机单线程的爬虫,然后再改成使用celery的方式。这里这样做是为了方便开发和测试,因为你单机搭起来并且跑得通了,那么分布式的话,就很容易改了,因为celery的API使用本来就很简洁。
我们抓取的是用户信息和他的关注和粉丝uid。用户信息的话,我们一个请求大概能抓取一个用户的信息,而粉丝和关注我们一个请求可以抓取18个左右(因为这个抓的是列表),显然可以发现用户信息应该多占一些请求的资源。这时候就该介绍理论篇没有介绍的关于celery的一个高级特性了,它叫做任务路由。直白点说,它可以规定哪个分布式节点能做哪些任务,不能做哪些任务。它的存在可以让资源分配更加合理,分布式微博爬虫项目初期,就没有使用任务路由,然后抓了十多万条关注和分析,结果发现用户信息抓几万条,这就是资源分配得不合理。那么如何进行任务路由呢?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
# coding:utf-8
import os
from
datetime import
timedelta
from celery
import Celery
from
kombu import
Exchange,
Queue
from config.conf
import get_broker_or_backend
from
celery import
platforms
# 允许celery以root身份启动
platforms.C_FORCE_ROOT
= True
worker_log_path =
os.path.join(os.path.dirname(os.path.dirname(__file__))+'/logs',
'celery.log')
beat_log_path
= os.path.join(os.path.dirname(os.path.dirname(__file__))+'/logs',
'beat.log')
tasks
= ['tasks.login',
'tasks.user']
# include的作用就是注册服务化函数
app =
Celery('weibo_task',
include=tasks,
broker=get_broker_or_backend(1),
backend=get_broker_or_backend(2))
app.conf.update(
CELERY_TIMEZONE='Asia/Shanghai',
CELERY_ENABLE_UTC=True,
CELERYD_LOG_FILE=worker_log_path,
CELERYBEAT_LOG_FILE=beat_log_path,
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_QUEUES=(
Queue('login_queue',
exchange=Exchange('login',
type='direct'),
routing_key='for_login'),
Queue('user_crawler',
exchange=Exchange('user_info',
type='direct'),
routing_key='for_user_info'),
Queue('fans_followers',
exchange=Exchange('fans_followers',
type='direct'),
routing_key='for_fans_followers'),
)
|
上述代码我指定了有login_queue
、user_crawler
、fans_followers
三个任务队列。它们分别的作用是登录、用户信息抓取、粉丝和关注抓取。现在假设我有三台爬虫服务器A、B和C。我想让我所有的账号登录任务分散到三台服务器、让用户抓取在A和B上执行,让粉丝和关注抓取在C上执行,那么启动A、B、C三个服务器的celery worker的命令就分别是
1
2
3
|
celery
-A
tasks.workers
-Q
login_queue,user_crawler
worker -l
info -c
1 # A服务器和B服务器启动worker的命令,它们只会执行登录和用户信息抓取任务
celery
-A
tasks.workers
-Q
login_queue,fans_followers
worker -l
info -c
1 # C服务器启动worker的命令,它只会执行登录、粉丝和关注抓取任务
|
然后我们通过命令行或者代码(如下)就能发送所有任务给各个节点执行了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
# coding:utf-8
from tasks.workers
import app
from
page_get import
user as
user_get
from db.seed_ids
import get_seed_ids,
get_seed_by_id,
insert_seeds,
set_seed_other_crawled
@app.task(ignore_result=True)
def crawl_follower_fans(uid):
seed
= get_seed_by_id(uid)
if
seed.other_crawled
== 0:
rs
= user_get.get_fans_or_followers_ids(uid,
1)
rs.extend(user_get.get_fans_or_followers_ids(uid,
2))
datas
= set(rs)
# 重复数据跳过插入
if
datas:
insert_seeds(datas)
set_seed_other_crawled(uid)
@app.task(ignore_result=True)
def
crawl_person_infos(uid):
"""
根据用户id来爬取用户相关资料和用户的关注数和粉丝数(由于微博服务端限制,默认爬取前五页,企业号的关注和粉丝也不能查看)
:param uid: 用户id
:return:
"""
if
not uid:
return
# 由于与别的任务共享数据表,所以需要先判断数据库是否有该用户信息,再进行抓取
user
= user_get.get_profile(uid)
# 不抓取企业号
if
user.verify_type
== 2:
set_seed_other_crawled(uid)
return
app.send_task('tasks.user.crawl_follower_fans',
args=(uid,),
queue='fans_followers',
routing_key='for_fans_followers')
@app.task(ignore_result=True)
def
excute_user_task():
seeds
= get_seed_ids()
if
seeds:
for
seed in
seeds:
# 在send_task的时候指定任务队列
app.send_task('tasks.user.crawl_person_infos',
args=(seed.uid,),
queue='user_crawler',
routing_key='for_user_info')
|
这里我们是通过 queue='user_crawler',routing_key='for_user_info'
来将任务和worker进行关联的。
关于celery任务路由的更详细的资料请阅读官方文档。
到这里,基本把微博信息抓取的过程和分布式进行抓取的过程都讲完了,具体实现分布式的方法,可以读读基础篇。由于代码量比较大,我并没有贴上完整的代码,只讲了要点。分析过程是讲的抓取过程的分析和页面解析的分析,并在最后,结合分布式,讲了一下使用任务队列来让分布式爬虫更加灵活和可扩展。
如果有同学想跟着做一遍,可能需要参考分布式微博爬虫的源码,自己动手实现一下,或者跑一下,印象可能会更加深刻。文章评论
- 登录后评论
点击排行
-
php-fpm安装、配置与优化
转载自:https://www.zybuluo.com/phper/note/89081 1、php中...
-
centos下postgresql的安装与配置
一、安装(以root身份进行)1、检出最新的postgresql的yum配置从ht...
-
Mysql的大小写敏感性
MYSQL在默认的情况下查询是不区分大小写的,例如:CREATE TABLE...
-
关于URL编码
转载自:http://www.ruanyifeng.com/blog/2010/02/url_encoding....
-
header中的Cache-control
网页的缓存是由HTTP消息头中的“Cache-control”来控制的,常见的...