Scrapy框架组件之调度器

在 Fri 06 November 2015 发布于 源码分析 分类

本片文章分析的是Scrapy框架的调度器的实现。在Scrapy框架中,调度器的作用是调度Request对象,弹出Request对象提交给Scrapy引擎进行fetch,压入新的需要fetch的Request对象,涉及到Request的存储调度。

在进行下面之前,需要了解一个Python的开源库queuelib, 它现在从Scrapy框架中独立出去,作为一个单独的库开源出去,其实现了一系列的队列。 下面简单的说下:

FifoDiskQueue: 这是个FIFO队列,使用文件块来实现落地磁盘的功能。每个文件块存储的数据的个数为chunksize的大小,这个值默认的情况下是100000条,且在落地的文件夹里面会有个info.json文件,其内容格式为

 {"tail": [0, 0, 0], "head": [10, 0], "chunksize": 100000, "size": 1000000}

size为当前FifoDiskQueue的大小;chunksize为每个文件块能够存储多少条数据;head的第一个元素为要写入数据的文件块的下标,第二个元素为该文件块已经写了多少条数据;tail的第一个元素为要读的文件块的下标,第二个元素为累计读取了多少条数据,第三个元素文件的offset记录读取了多少字节。需要注意的是在每次操作完了FifoDiskQueue实例后,都需要调用其成员函数close,以写入状态到info.json这个文件中,每次在实例化FifoDiskQueue的时候,都会通过info.json来初始化状态。

LifoDiskQueue: 所有数据都落地到一个文件中去,其pop操作都是通过seek来实现,truncate来删除多余的数据。 还有一点需要注意的是FifoDiskQueue/LifoDiskQueue操作的元素都必须是bytes的实例。 FifoSQLiteQueue/LifoSQLiteQueue: 提供了一种使用数据库来实现queue的思路 FifoMemoryQueue/LifoMemeoryQueue : 基于内存的先见先出/后进先出队列,其内部是通过collections.deque来实现的。

PriorityQueue: 是一个有权重的队列,其在实例化的时候接受一个队列构造函数和一个整数数值类型的权重值作为参数(数值越小权重越大),其维护一个字典,就是权重值,就是存储元素的队列。

调度器在构造的时候使用了默认settings设置:

pqclass = load_object(settings['SCHEDULER_PRIORITY_QUEUE']) #queuelib.PriorityQueue
dqclass = load_object(settings['SCHEDULER_DISK_QUEUE']) #scrapy.squeues.PickleLifoDiskQueue
mqclass = load_object(settings['SCHEDULER_MEMORY_QUEUE']) #scrapy.squeues.LifoMemoryQueue

对于scrapy.squeues.LifoMemoryQueuequeuelib.PriorityQueue上面已经提到,这里不再多说,让我们看看PickleLifoDiskQueue的定义:

from  queuelib import queue
#通过继承queue_class来构造一个新的queue,其元素都是序列化后的对象
def _serializable_queue(queue_class, serialize, deserialize):
    class SerializableQueue(queue_class):
        def push(self, obj):
            s = serialize(obj)
            super(SerializableQueue, self).push(s)
        def pop(self):
            s = super(SerializableQueue, self).pop()
            if s:
                return deserialize(s)
    return SerializableQueue
def _pickle_serialize(obj):
    try:
        return pickle.dumps(obj, protocol=2)
    # Python>=3.5 raises AttributeError here while
    # Python<=3.4 raises pickle.PicklingError
    except (pickle.PicklingError, AttributeError) as e:
        raise ValueError(str(e))

PickleLifoDiskQueue = _serializable_queue(queue.LifoDiskQueue, \
    _pickle_serialize, pickle.loads)

这里可以很清楚地看到PickleLifoDiskQueue就是把操作的元素序列化了LifoDiskQeuue能操作的bytes对象实例。

了解了queuelib, Scrapy框架的组件调度器的实现源码scheduler.py就不难理解了,直接上源码,注解都在源码中。

class Scheduler(object):

    def __init__(self, dupefilter, jobdir=None, dqclass=None, mqclass=None,
                 logunser=False, stats=None, pqclass=None):
        self.df = dupefilter
        self.dqdir = self._dqdir(jobdir)
        self.pqclass = pqclass
        self.dqclass = dqclass
        self.mqclass = mqclass
        self.logunser = logunser
        self.stats = stats

    @classmethod
    def from_crawler(cls, crawler):
        settings = crawler.settings
        dupefilter_cls = load_object(settings['DUPEFILTER_CLASS'])
        dupefilter = dupefilter_cls.from_settings(settings)
        pqclass = load_object(settings['SCHEDULER_PRIORITY_QUEUE']) #queuelib.PriorityQueue
        dqclass = load_object(settings['SCHEDULER_DISK_QUEUE']) #scrapy.squeues.PickleLifoDiskQueue
        mqclass = load_object(settings['SCHEDULER_MEMORY_QUEUE']) #scrapy.squeues.LifoMemoryQueue
        logunser = settings.getbool('LOG_UNSERIALIZABLE_REQUESTS')
        return cls(dupefilter, jobdir=job_dir(settings), logunser=logunser,
                   stats=crawler.stats, pqclass=pqclass, dqclass=dqclass, mqclass=mqclass)

    def has_pending_requests(self):
        return len(self) > 0

    def open(self, spider):
        self.spider = spider
        #基于内存的优先队列
        self.mqs = self.pqclass(self._newmq)
        # 若存在工作目录,根据工作目录中的数据来初始化,若没有工作目录,直接赋值为None
        self.dqs = self._dq() if self.dqdir else None
        return self.df.open()

    # 会调用优先队列的close方法,可以看看queuelib中的源码, 把还没有处理的request,即active状态,转换为json,当再次启动spider,初始化的时候会从其入读上次没有处理完的request
    def close(self, reason):
        if self.dqs:
            prios = self.dqs.close()
            with open(join(self.dqdir, 'active.json'), 'w') as f:
                json.dump(prios, f)
        return self.df.close(reason)

    def enqueue_request(self, request):
        #如果需要过滤,并且该request的url在url指纹库里面则打印日志,直接返回false
        if not request.dont_filter and self.df.request_seen(request):
            self.df.log(request, self.spider)
            return False
        # 入队列
        dqok = self._dqpush(request)
        #这代表这落地到本地
        if dqok:
            self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider)
        else:
            #放到内存中去
            self._mqpush(request)
            self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider)
        self.stats.inc_value('scheduler/enqueued', spider=self.spider)
        return True
    # 取request
    def next_request(self):
        # 先从内存中取
        request = self.mqs.pop()
        if request:
            self.stats.inc_value('scheduler/dequeued/memory', spider=self.spider)
        else:
            #内存中没有取到,就从落地到本地磁盘中取request
            request = self._dqpop()
            if request:
                self.stats.inc_value('scheduler/dequeued/disk', spider=self.spider)
        if request:
            self.stats.inc_value('scheduler/dequeued', spider=self.spider)
        return request

    # 需要注意的是,虽然有两种情况,一种是落地到本地磁盘,一种直接落地到内存中,其中前者的情况特殊
    # 其即可能落入到本地磁盘也可以入到内存中去
    def __len__(self):
        return len(self.dqs) + len(self.mqs) if self.dqs else len(self.mqs)

    # 若self.dps为空,直接返回,否则把request转换成dict类型序列化后放入queue中
    def _dqpush(self, request):
        if self.dqs is None:
            return
        try:
            reqd = request_to_dict(request, self.spider)
            self.dqs.push(reqd, -request.priority)
        except ValueError as e: # non serializable request
            if self.logunser:
                logger.error("Unable to serialize request: %(request)s - reason: %(reason)s",
                             {'request': request, 'reason': e},
                             exc_info=True, extra={'spider': self.spider})
            return
        else:
            return True

    # 把request推入到内存中
    def _mqpush(self, request):
        self.mqs.push(request, -request.priority)

    # 从queue中取request, 因为在落地到本地磁盘上是被转换成的dict,所以需要从dict转换到request
    def _dqpop(self):
        if self.dqs:
            d = self.dqs.pop()
            if d:
                return request_from_dict(d, self.spider)

    def _newmq(self, priority):
        return self.mqclass()

    def _newdq(self, priority):
        # PickleLifoDiskQueue 创建队列
        return self.dqclass(join(self.dqdir, 'p%s' % priority))

    def _dq(self):
        activef = join(self.dqdir, 'active.json')
        if exists(activef):
            with open(activef) as f:
                prios = json.load(f)
        else:
            prios = ()
        # 创建优先队列
        q = self.pqclass(self._newdq, startprios=prios)
        if q:
            logger.info("Resuming crawl (%(queuesize)d requests scheduled)",
                        {'queuesize': len(q)}, extra={'spider': self.spider})
        return q

    #这是对参数 -s JOBDIR=/path/to/jobdir的作用,一般我们在运行scrapy的时候,request队列在默认情况下是放在
    #内存中的,大量的request队列,可能导致内存的暴涨,再有,我们也会有需要有暂停/重新启动爬虫的需求,可以选就
    #一种持久化的方式,即落地request队列
    # 该函数的作用是在指定目录的情况下,确定该目录下存在'requests.queue'子目录,供落地request queue
    # 若没有指定,那就是默认模式,使用内存
    def _dqdir(self, jobdir):
        if jobdir:
            dqdir = join(jobdir, 'requests.queue')
            if not exists(dqdir):
                os.makedirs(dqdir)
            return dqdir

调度器中的各个队列存储的状态可以通过Scrapy的内置的Telnet服务来查看,具体请看Telnet Console