企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] ## 1. 分布式爬虫 使用第三方库scrapy-redis,GitHub地址:https://github.com/rmax/scrapy-redis 1. Scrapy多个spider不能共享待爬取队列Scrapy queue, 即Scrapy本身不支持爬虫分布式。 ### 1.1 架构 ![](https://box.kancloud.cn/977dc54d05df123c98601a5a474881b4_1534x1010.png) 结构变化是在请求队列方面,由redis统一管理请求队列,协调多个spider服务。redis提供数据存储(爬取到的数据和待爬取请求URL)、指纹去重、 分布式爬虫与scrapy框架的改变主要是下边: 1. 有请求过来了,有spider引擎交个Scheduler,Scheduler将请求交给redis进行验证(指纹队列),如果该URL没有爬取,则redis将URL放回给Scheduler 2. redis与爬虫相关的三个队列 "yaoq:dupefilter" :去重指纹队列 "yaoq:items" : 存储爬取到的数据队列 "yaoq:requests" : 待爬取 ## 2. 实践 * scrapy单机爬虫时,我们主要自己编写两个spider类:CrawlSpider和Spider,在分布式中对应RedisCrawlSpider和RedisSpider ### 2.1 spider 1. 继承RedisCrawlSpider 2. 没有start_urls(在redis中给定,多个爬虫端去随机获取) ~~~ pip3 install scrapy-redis ~~~ ~~~ __author__ = 'dailin' from scrapy_redis.spiders import RedisCrawlSpider from scrapy.linkextractors import LinkExtractor from scrapy.spiders import CrawlSpider, Rule from scrapyredis.items import Yaoq class YaoQ(RedisCrawlSpider): name = 'yaoq' allowed_domains = ['yaoq.net'] rules = ( # 只提取复合规则的页面链接,不做分析,所以跟页面但是没有,follow是对网易深一层的爬取,false表示不提取连接,也不请求页面上的连接 Rule(LinkExtractor(allow=r'www.yaoq.net/thread.*\.html'), callback='parse_item', follow=False), Rule(LinkExtractor(allow=r'www.yaoq.net/forum-95-\d+\.html'), follow=True) ) def parse_item(self, response): try: item = Yaoq() # print(response.text) author = response.xpath("//div[@class='pti']//div[@class='authi']/a[1]/text()").extract()[0] authorLocation = response.xpath("//div[@class='pti']//div[@class='authi']/a[1]/@href").extract()[0] pubDate = response.xpath("//div[@class='pti']//div[@class='authi']//em[1]/text()").extract()[0] # 提取所有文本 content = \ response.xpath("//div[@class='pcb']//div[@class='t_fsz']/table[1]//tr")[0].xpath('string(.)').extract()[0] contentData = content.replace("\r\n", "") title = response.xpath("//span[@id='thread_subject']/text()").extract()[0] print(author) print(authorLocation) print(pubDate) print(contentData) print(title) item['title'] = title item['pubDate'] = pubDate item['author'] = author item['authorLocation'] = authorLocation item['content'] = contentData item['id'] = str(uuid.uuid1()) yield item except BaseException as e: print(e) ~~~ ### 2.2 settings ~~~ # -*- coding: utf-8 -*- # 指定使用scrapy-redis的调度器 SCHEDULER = "scrapy_redis.scheduler.Scheduler" # 指定使用scrapy-redis的去重 DUPEFILTER_CLASS = 'scrapy_redis.dupefilters.RFPDupeFilter' # 指定排序爬取地址时使用的队列, # 默认的 按优先级排序(Scrapy默认),由sorted set实现的一种非FIFO、LIFO方式。 SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderPriorityQueue' # 可选的 按先进先出排序(FIFO) # SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderQueue' # 可选的 按后进先出排序(LIFO) # SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.SpiderStack' # 在redis中保持scrapy-redis用到的各个队列,从而允许暂停和暂停后恢复,也就是不清理redis queues、 # 中断后可以继续爬取 SCHEDULER_PERSIST = True # 只在使用SpiderQueue或者SpiderStack是有效的参数,指定爬虫关闭的最大间隔时间 # SCHEDULER_IDLE_BEFORE_CLOSE = 10 # 通过配置RedisPipeline将item写入key为 spider.name : items 的redis的list中,供后面的分布式处理item # 这个已经由 scrapy-redis 实现,不需要我们写代码 ITEM_PIPELINES = { 'example.pipelines.ExamplePipeline': 300, 'scrapy_redis.pipelines.RedisPipeline': 400 } # 指定redis数据库的连接参数 # REDIS_PASS是我自己加上的redis连接密码(默认不做) REDIS_HOST = '127.0.0.1' REDIS_PORT = 6379 #REDIS_PASS = 'redisP@ssw0rd' # LOG等级 LOG_LEVEL = 'DEBUG' #默认情况下,RFPDupeFilter只记录第一个重复请求。将DUPEFILTER_DEBUG设置为True会记录所有重复的请求。 DUPEFILTER_DEBUG =True ~~~ ### 2.3 运行爬虫 #### 2.3.1 运行爬虫 ~~~ scrapy runspider scrapyredis/spiders/YaoQ.py ~~~ > 此时爬虫处于等待状态(因为没有给定start_urls) #### 2.3.2 在redis中给定start_urls 1. 此时redis中,没有任何key ![](https://box.kancloud.cn/c2ff518b974965e7ebd03e89f323e64b_543x76.png) 2. 给定起始url(就是想队列中放入URL,让爬虫去爬) >1. 这个起始的URL在redis中对应的键名默认是`spidername:start_urls`的组合,例如spider名称为 > yaoqyaoq,对应的键名为yaoq:start_urls。 > 2. 也可以在程序中显示的指定(start_url名称任意,但是按照规范来较好) ~~~ lpush yaoq:start_urls 'http://www.yaoq.net/forum-95-1.html' ~~~ > 此时爬虫开始,其他爬取到的URL也会存储到redis中 ![](https://box.kancloud.cn/7d53c273c684893362ee2d4a39c71f49_964x140.png) > start_urls被取走,这个队列消失,此时redis中有三个和该spider相关的队列 ![](https://box.kancloud.cn/7d53c273c684893362ee2d4a39c71f49_964x140.png) > 1) yaoq:dupefilter:去重队列 > 2) yaoq:items: 数据 > 3) yaoq:requests :待爬取队列 #### 2.3.3 处理爬取到的数据 从redis读取爬取的数据,爬取到的数据默认键是 spidername:items ,例如yaoq:items 把数据存储到mysql(一直阻塞地消费yaoq:items队列) ~~~ # coding=utf-8 __author__ = 'dailin' import json import redis import pymysql def main(): # 指定redis数据库信息 rediscli = redis.StrictRedis(host='192.168.56.130', port=6379, db=0) # 指定mysql数据库 mysqlcli = pymysql.connect(host='192.168.56.130', user='root', passwd='tuna', db='crawl_data', port=3306, charset="utf8", use_unicode=True) while True: # FIFO模式为 blpop,LIFO模式为 brpop,获取键值 source, data = rediscli.blpop(["yaoq:items"]) item = json.loads(data) try: # 使用cursor()方法获取操作游标 cur = mysqlcli.cursor() sql = "INSERT INTO yaoq (author, author_location, content, id, pub_date, title) VALUES (%s, %s, %s, %s, %s, %s)" # 使用execute方法执行SQL INSERT语句 content = item['content'] print(content) cur.execute(sql,(item['author'], item['authorLocation'], item['content'] ,item['id'], item['pubDate'], item['title'])) # 提交sql事务 mysqlcli.commit() # 关闭本次操作 cur.close() print("inserted %s" % item['title']) except Exception as e: print(e) if __name__ == '__main__': main() ~~~