The RSS
用gcrawler进行多级页面并发下载的例子
原文链接:http://mental.we8log.com/entry/218/my_weblog
分类:Python相关
TAG:
本文被浏览了558次
作者:猛禽 | 2011-04-07 22:14:00 | 0评

前文《gcrawler:一个基于gevent的简单爬虫框架》中说到:

当然要实现像Scrapy那样的方式也可以,只要在worker里把下一级页 面链接再传递给scheduler即可。

原 文中所举例的情况主要是同时对多个链接进行并发下载的情况,但对于要下载一个链接及其下一级甚至N级链接的情况,如果还按原文那样在一个worker里串 行下载的话就不能达到并发的效果了。而这里所谓的“Scrapy那样的方式”就是把某个页面中解析出来的下一级页面链接传回引擎,使之可以对这些下一级页 面进行并发下载,提高下载效率。

在评论里gyj_freedom希望进一步了解这个方式的实现,那现在就来写一下吧。

最直接能想到的实现就是在GCrawler类的dowoker里调用一个方法去解析结果,然后把下一级的链接放到qin里去。但是我觉得还是尽量保持这个类的功能单一比较好,所以不改GCrawler了。

另一个方法就是在Spider类里去实现,我本来是打算写一个Batch download spider,不过后来发现其实可以在原来那个Downloader类的基础上修改实现,于是我就直接把Downloader类给改了,这就是现在这个例子。

基 本的思路就是scheduler生成器在生成完所有urls以后继续等待下一步的解析结果,然后继续将解析结果生成返回。在worker方法里则增加一个 解析方法调用,然后把返回的结果能过类成员传递给scheduler。这样包括下一级页面在内的所有链接都可以通过并发的方式下载了。

新的Downloader是这样实现的:

class Downloader:
    def __init__(self, urls):
        self.urls = urls
        self.subitems = queue.Queue(-1)
        self.parsings = 0

    def scheduler(self):
        parse=None
        if hasattr(self, 'parse'):
            parse = self.parse
        for u in self.urls:
            if parse != None:
                self.parsings += 1
            yield dict(url=u, parse=parse)
        while self.parsings > 0:
            sleep(1)
            while not self.subitems.empty():
                item = self.subitems.get()
                try:
                    if item['parse'] != None:
                        self.parsings += 1
                except:
                    pass
                yield item

    def worker(self, item):
        r = None
        try:
            r = self.doWorker(item['url'])
            if r != None and item['parse'] != None:
                try:
                    new_items, r = item['parse'](r)
                    for i in new_items:
                        self.subitems.put(i)
                finally:
                    self.parsings -= 1
        except Exception, e:
            logger.error("Error on get %s:%s
%s" % (item['url'], e, traceback.format_exc()))
        return r

    @retryOnURLError(3)
    def doWorker(self, url):
        logger.debug("Download starting...%s" % url)
        f = urllib2.urlopen(url)
        data = f.read()
        f.close()
        return dict(url=url, data=data)

其 中new_items就是从解析结果中取得并且要传递给scheduler的下级页面的链接队列。worker里通过判断item的parse值来确定是 否进行页面解析,如果解析的话,则把解析结果更新到new_items中去。如果解析过的页面结果不需要传递给pipeline保存的话(比如这只是一个 索引页面),只要让解析函数返回的tuple第二值(如下例中的result)为None即可。

总体来说这个实现还是有点Ugly的,主要 是因为页面解析层级是不确定的,scheduler难以确定结束的时机,所以增加了一个parsing计数器来记录当年在进行parsing的 workers数量。因为gevent的并发是通过协程实现的,没有线程那样的并发冲突,所以这里没有加锁,应该是不会有并发冲突的问题。不过还是需要经 过一段时间的实践检验看看,目前暂时只能先这样。

如果不使用多级并发下载的话,新的Downloader的用法与原来一样。如果要使用多级并发下载的话,只要在派生类里增加一个(两级的情况,更多级的话可能需要更多的)解析方法。为了保持与scrapy一致,默认的解析方法也是叫做parse。

具体的spider实现举例如下:

class ImagesDownloader(Downloader):
    def parse(self, result):
        pat_imgs = re.compile(u"<img.*src="([^"]*)", re.I | re.U)
        imgs = list(set(pat_imgs.findall(result['data'])))
        print "Images count %s" % len(imgs)
        new_items = []
        for i in imgs:
            if i != "":
                item = dict(url=i, parse=None)
                new_items.append(item)
        return new_items, result

    def pipeline(self, results):
        print "== %s" % datetime.now()
        for r in results:
            r['datasize'] = len(r['data'])
            print "Data fetched : %(url)s(%(datasize)s)" % r

如 上面的代码所表现的那样,唯一的不同就是增加了一个parse方法,在这个方法里,你可以用任何你喜欢的方式去解析页面(比如正则表达式、XPath、 PyQuery等,这个例子里是用正则表达式),然后将解析结果返回给Downloader。这个例子只实现了一级链接(下载页面上的全部图片链接),如 果还要进一步解析下一级页面甚至N级,只要在上面代码中“item = dict(url=i, parse=None)”中把None换成下一级的解析方法并实现这个方法即可。

更新后的代码在这里下载:1.9KB,或者访问Google Code的本项目页面

推送到[go4pro.org]

为了保证评论的有效性,我们需要对评论做出审核后进行发布。另外敬请在表单的Approved字段中随便填写一些东西——这是为了防止spam。

Ver 2.0,2009 - 2012,Go4Pro.org
知识共享许可协议
Go4Pro.org is licensed under a Creative Commons 署名-非商业性使用-相同方式共享 3.0 Unported License.
Go4Pro老版请访问:legacy.go4pro.org | Q & A

Valid XHTML 1.0 Transitional  Valid CSS!