scrapy排重机制

在 Wed 16 September 2015 发布于 源码分析 分类

本篇文章主要的内容是scrapy在单机排重下的机制, 并提共分布式scrapy爬虫服务的url排重方案

scrapy在单机下设置url是否排重过滤很简单, 在每次抛出对该url构造的Request对象给调度器(schedler)时候, 设置Request的参数dont_filter 是否为True来让schedler判断时候对其走排重过滤的逻辑, dont_filter的默认值为False, 即该url会走排重逻辑 源码逻辑如下:

    def enqueue_request(self, request):
        if not request.dont_filter and self.df.request_seen(request):
            self.df.log(request, self.spider)
            return False
        dqok = self._dqpush(request)
        if dqok:
            self.stats.inc_value('scheduler/enqueued/disk', spider=self.spider)
        else:
            self._mqpush(request)
            self.stats.inc_value('scheduler/enqueued/memory', spider=self.spider)
        self.stats.inc_value('scheduler/enqueued', spider=self.spider)
        return True

源码中的self.df就是scrapy调度器(schedler)在构造时初始化的时候设置的排重逻辑类

def __init__(self, dupefilter, jobdir=None, dqclass=None, mqclass=None, logunser=False, stats=None):
        self.df = dupefilter

scrapy的排重逻辑默认的是scrapy.dupefilters.RFPDupeFilter, 在settings里面有参数DEPEFILTER_CLASS可以进行设置不同的排重逻辑。先看看scrapy默认的排重机制的实现。 首先通过url指纹生成算法生成url指纹, 然后判断其是否在url指纹库里面, 若存在直接返回True。 如不存在,先存储在url指纹库里里面,然后写入到文件(文件request.seen存在于通过命令行-s JOBDIR设置的文件夹里面,这个后续文章回说到 )。

def request_seen(self, request):
    fp = self.request_fingerprint(request)
    if fp in self.fingerprints:
        return True
    self.fingerprints.add(fp)
    if self.file:
        self.file.write(fp + os.linesep)

def request_fingerprint(self, request):
    return request_fingerprint(request)

这里的request_fingerprint使用scrapy.utils.request.request_fingerprint来给给定的request生成url指纹, 其中的include_headers参数是scrpay考虑到有些request虽然请求的url都是相同的,但是因为有cookie的存在可能会产生不同的页面内容,如果这个参数值不为空的情况下,会把headers里面的key/value对先排序然后转换为小写构造一个元组做__fingerprint_cache的key。代码逻辑如下:

#__fingerprint_cache是个弱引用字典, 若引用的好出是不会增加对象的引用计数,可以很好的预防引用计数不为0导致python的内存泄露
__fingerprint_cache = weakref.WeakKeyDictinoary()
def request_fingerprint(request, include_headers=None):
    if include_headers:
        include_headers = tuple(to_bytes(h.lower())
            for h in sorted(include_headers))
    #取出_fingerprint_cache中request对应的值
    cache=_fingerprint_cache.setdefault(request, {})
    #因为include_headers默认为空值,且由上面RFPDupeFilter的实现可以看出,include_headers在源码的逻辑里面都是空值,也就是说_fingerprint_cache里面保存的都是请求的Method,url和请求所带的body经过hashlib.sha1算法后的到的url指纹。若include_headers不为空,则也会对include_headers里面的各项进行hashlib.sha1计算。
    if include_headers not in cache:
        fp = hashlib.sha1()
        fp.update(to_bytes(request.method))
        fp.update(to_bytes(canonicalize_url(request.url)))
        fp.update(request.body or b'')
        if include_headers:
            for hdr in include_headers:
                if hdr in request.headers:
                    fp.update(hdr)
                        for v in request.headers.getlist(hdr):
                        fp.update(v)
        cache[include_headers] = fp.hexdigest()
    return cache[include_headers]                       

在这里对于url进行hash的时候,scrapy会处理url。需要明白下面的url的效果应该是相同的:

 http://www.example.com/query?id=111&cat=222
 http://www.example.com/query?cat=222&id=111

源码中的canonicalize_url的逻辑就是分析url的成分, 把url中的query进行key/value的排序,剔除不安全字符:

def canonicalize_url(url,keep_blank_values=True,keep_fragments=False, 
                    encoding = None):
    scheme, netloc, path, params, query, fragment = parse_url(url)
    keyvals = parse_sql(query, keep_blank_values)
    keyvals.sort()
    path = safe_url_string(_unquotepath(path)) or '/'
    fragment = '' if not keep_fragments eles fragment
    return urlunparse((scheme, netloc.lower(), path, params, query, fragment))

在日常的工作中,单实例scrapy爬虫的爬取能力很有限,需要多个scrapy爬虫实例。多scrapy爬虫实例需要解决的问题之一就是怎样进行url的排重问题。对于这个问题,我们可以使用关系形数据库或者nosql来实现url的排重,只需要做的是继承RFPDupeFilter并重写request_seen的逻辑即可!下面给出一个简单的实现。

class MongodbDupeFilter(RFpDupeFilter):
    def __init__(self, *args, **kvargs):
        super(MongodbDupeFilter, self).__init__(*args, **kwargs)
        self.mongoConn = pymongo.MongoClient(settings["MONGODB_SERVER_ADDRESS"])
    def request_seen(self, request):
        url = canonicalize_url(request.url)
        db = self.mongoConn[settings['MONGODB_DUPEFILTER_DB_NAME']]
        collection = db[settings['MONGODB_DUPEFILTER_TABLE_NAME']]
        count = collection.find({'url' : url})
        if count > 0:
            return True
        collection.insert({'url' : url})

    def close(self, reason):
        self.mongoConn.close()
        super(MongodbDupeFilter, self).close()