Nutch源代码分析

WalterLee

贡献于2011-10-13

字数:0 关键词: 搜索引擎 nutch

www.73cc.com 1 NUTCH 一 Nutch 的大致工作流程可以通过上一篇文章有了一定的了解了。在上一篇文章中,主要 是针对一幅 Nutch 工作流程图片来了解 Nutch 的工作流程,十分感性,并没有涉及到任 何关于 Nutch 的包和类。这里通过在网上下载的一个《Nutch 入门学习》的 PDF 文档中 介绍的内容,来详细组织一下,加深了解,为深入研究 Nutch 的源代码奠定良好的基础。 这里通过几个标题来叙述。 Nutch 爬虫工作策略 Nutch 爬虫的工作策略一般则可以分为累积式抓取(cumulative crawling)和增量式抓取 (incremental crawling)两种。 累积式抓取是指从某一个时间点开始,通过遍历的方式抓取系统所能允许存储和处理的 所有网页。在理想的软硬件环境下,经过足够的运行时间,累积式抓取的策略可以保证 抓取到相当规模的网页集合。但由于 Web 数据的动态特性,集合中网页的被抓取时间点 是不同的,页面被更新的情况也不同,因此累积式抓取到的网页集合事实上并无法与真 实环境中的网络数据保持一致。 增量式抓取是指在具有一定量规模的网络页面集合的基础上,采用更新数据的方式选取 已有集合中的过时网页进行抓取,以保证所抓取到的数据与真实网络数据足够接近。进 行增量式抓取的前提是,系统已经抓取了足够数量的网络页面,并具有这些页面被抓取 的时间信息。 面向实际应用环境的网络蜘蛛设计中,通常既包括累积式抓取,也包括增量式抓取的策 略。累积式抓取一般用于数据集合的整体建立或大规模更新阶段;而增量式抓取则主要 针对数据集合的日常维护与即时更新。 在确定了抓取策略之后,如何从充分利用网络带宽,合理确定网页数据更新的时间点就 成了网络蜘蛛运行策略中的核心问题。 总体而言,在合理利用软硬件资源进行针对网络数据的即时抓取方面,已经形成了相对 比较成熟的技术和实用性解决方案,这方面目前所需解决的主要问题,是如何更好的处 理动态网络数据问题(如数量越来越庞大的 Web2.0 数据等),以及更好的根据网页质量 修正抓取策略的问题。 下面,通过针对 Nutch 工作流程中,各个关键的环节进行详细分析阐述,可能要涉及到 Nutch 的源代码:包和类,有助于在阅读 Nutch 源代码的时候提供导向作用。 Nutch 工作流程:建立初始 URL 集合分析 初始 URL 集的建立有两种方式:超链接和站长提交。 超链接是指机器人程序根据网页链到其他网页中的超链接,就像日常生活中所说的“一传 十,十传百……”一样,从少数几个网页开始,连到数据库上所有到其他网页的链接。理 论上,若网页上有适当的超连结,机器人便可以遍历绝大部分网页。 站长提交是指在实际运行中,爬虫不可能抓取到所有站点,为此,网站站长可以向搜索 引擎进行提交,要求收录,搜索引擎经过核查之后,便将该网站加入到 URL 集合中,进 行抓取。 Nutch 工作流程:inject 操作分析 inject 操作调用的是 nutch 的核心包之一 crawl 包中的类 org.apache.nutch.crawl.Injector。 它执行的结果是:crawldb 数据库内容得到更新,包括 URL 及其状态。 inject 操作主要作用可以从下面 3 方面来说明: www.73cc.com 2 (1) 将 URL 集合进行格式化和过滤,消除其中的非法 URL,并设定 URL 状态 (UNFETCHED),按照一定方法进行初始化分值; (2) 将 URL 进行合并,消除重复的 URL 入口; (3) 将 URL 及其状态、分值存入 crawldb 数据库,与原数据库中重复的则删除旧的,更 换新的。 Nutch 工作流程:generate 操作分析 generate 操作调用的是 crawl 包中的类 org.apache.nutch.crawl.Generator。它执行的结果是: 创建了抓取列表,存放于 segments 目录下,以时间为文件夹名称。循环抓取多少次, segments 文件夹下就会有多少个以时间为名称的文件夹。 generate 操作主要作用可以从下面 3 方面来说明: (1) 从 crawldb 数据库中将 URL 取出并进行过滤; (2) 对 URL 进行排序,通过域名、链接数和一种 hash 算法综合进行降序排列; (3) 将排列列表写入 segments 目录中。 Nutch 工作流程:fetch 操作分析 fetch 操作调用的是 fetcher 包中的类 org.apache.nutch.fetcher.Fetcher。它执行的结果是: 将页面内容抓取下来,存于 segment 目录下。 fetch 操作主要作用可以从下面 4 方面来说明: (1) 执行抓取,按照 segments 目录下的抓取列表进行; (2) 抓取过程中,页面的 URL 地址可能因为链接发生改变,从而需要更新 URL 地址; (3) 抓取采用多线程方式进行,以提高抓取速度; (4) fetch 操作过程中调用了 parse 操作。 Nutch 工作流程:parse 操作分析 parse 操作调用的是 parse 包中的类 org.apache.nutch.parse.ParseSegment。它执行的结果是: 将 fetch 得到的页面解析为 text 和 data,存于 segments 目录下。 parse 操作主要作用可以从下面 3 方面来说明: (1) 解析 segment 中由 fetch 得到的页面,并进行整理,将页面分成为 parse-date 和 parse-text; (2) parse-date 中保存的是页面的题名、作者、日期、链接等内容; (3) parse-text 中保存的是页面的文本内容。 例如,我只执行一次抓取任务,就执行了上述的一些操作,操作的结果直接反映在 segments 目录中。可以看到在 home\SHIYANJUN\nutch-0.9\mydir\segments 目录下面创 建了 20081004102407 这个目录,该目录中包含 6 个目录:content、crawl_fetch、 crawl_generate、crawl_parse、parse_data、parse_text,从目录名称就可以大致知道该目 录存取的相关内容信息。 Nutch 工作流程:updatedb 操作分析 updatedb 操作调用的是 crawl 包中的类 org.apache.nutch.crawl.CrawlDb。它执行的结果是: 更新了 crawldb 数据库,为下一轮抓取做准备。 updatedb 操作主要作用如下: 根据 segments 目录下 fetch 目录和 parse 目录中的内容,对 crawldb 进行更新,增加新的 URL,更换旧的 URL。 Nutch 工作流程:invertlinks 操作分析 invertlinks 操作用来更新 linkDB,为建立索引的工作提供准备。 Nutch 工作流程:index 过程分析 www.73cc.com 3 index 过程,即索引过程,包括:将数据转换成文本、分析文本、将分析过的文本保存到 数据库中这三个操作。 (1) 转换成文本 在索引数据之前,首先必须将数据转换成 nutch 能够处理的格式――纯文本字符流。但 是,在现实世界中,信息多以富媒体(rich media)文档格式呈现:PDF、WORD、EXCEL、 HTML、XML 等。为此,nutch 采用了插件机制(plugin),通过各种各样的文档解析器, 将富媒体转换成纯文字字符流。文档解析器种类繁多,开发人员可以根据需要进行选择, 同时还可以自己修改或编写,非常灵活方便。 (2) 分析文本 在对数据进行索引前,还需要进行预处理,对数据进行分析使之更加适合被索引。分析 数据时,先将文本数据切分成一些大块或者语汇单元(tokens),然后对它们执行一些可选 的操作,例如:在索引之前将这些语汇单元转换成小写,使得搜索对大小写不敏感;最 有代表性的是要从输入中去掉一些使用很频繁但却没有实际意义的词,比如英文文本中 的一些停止词(a、an、the、in、on 等)。同样的,我们也需要分析输入的语汇单元,以便 从词语中去掉一些不必要的字母以找到它们的词干。这一处理过程称为分析(analyze)。 分析技术在索引和搜索时都会用到,比较重要。 (3) 将分析过的文本保存到数据库中 对输入的数据分析处理完成之后,就可以将结果写入到索引文件中。Nutch 采用的是 Lucene 的索引格式,可以参考关于 Lucene 的索引机制。Lucene 采用“倒排索引”的数据 结果来存储索引的。 Nutch 工作流程:搜索程序分析 Nutch 的搜索程序执行过程,可以从下面的步骤了解: (1) HTTP 服务器接收用户发送过来的请求。对应到 nutch 的运行代码中就是一个 servlet, 称为查询处理器(Query Handler)。查询处理器负责响应用户的请求,并将相应的 HTML 结果页面返回给用户。 (2) 查询处理器对查询语句做一些微小的处理并将搜索的项(terms)转发到一组运行索引 搜索器的机器上。Nutch 的查询系统似乎比 lucene 简单的多,这主要是因为搜索引擎的 用户对他自己所要执行的查询内容有非常清晰的思路。然而,lucene 的系统结构非常灵 活,且提供了多种不同的查询方式。看似简单的 nutch 查询最终被转换为特定的 lucene 查询类型。每个索引搜索器并行工作且返回一组有序的文档 ID 列表。 (3) 现在存在这大量从查询处理器返回过来的搜索结果数据流。查询处理器对这些结果 集进行比较,从所有的结果查找出匹配最好的那些。如果其中任何一个索引搜索器在 1~2 秒之后返回结果失败,该搜索器的结果将被忽略。因此,最后列表由操作成功的搜索器 返回的结果组成。 关于查询处理器 查询处理器对查询作了一些细微的处理,例如删除停止词(例如 the、of 等)。接着 nutch 需要执行一些操作以便于它在大规模的数据环境下能更好的工作。一个索引搜索器涉及 搜索的文档集数目非常庞大,所以 nutch 需要同时与很多索引搜索器交互来提高搜索速 率。实际运行环境中,为了保证系统级别的稳定性,文档集的段文件会被复制到多个不 同主机上。因为对于文档集中的每个段,查询处理器会随机的与其中一个可搜索到自身 的索引搜索器相交互。如果发现一个索引搜索器不能交互,查询处理器会通知之后的搜 索操作不使用该搜索器,但是查询处理器每隔一段时间会回头检查一次搜索器的状态, 以防该主机上的搜索器再次可用。 www.73cc.com 4 关于分析器 Nutch 使用自己的分析器,对应于 analysis 包。Nutch 把索引时进行分析所使用的二元语 法技术(bigram)与查询过程中对短语的优化技术结合在一起,通过二元语法技术可以把两 个连续的词组合成一个语汇单元。这就可以大大减少搜索时需要考虑的文档范围,例如, 包含词组 the quick 的文档比包含 the 的文档要少的多。 分析器对分析操作进行了封装。分析器通过执行若干操作,将文本语汇单元化,这些操 作可能包括提取单词、去除标点符号、去掉语汇单元上的音调符号、将字母转化为小写(也 称为规格化)、移除常用词、将单词转换为词干形式(词干还原),或者将单词转换为基本 形等。这个过程也称为语汇单元化过程。分析操作发生在两个阶段:建立索引和进行查 询时。 nutch 的其他一些特性 ● 为了获取小数量的文档(通常是 10 个左右),查询处理器会对每个索引搜索器进行查询。 因为最后的结果是从多个索引搜索器中合并得到的,所以就没有必要从一个数据源中获 取过多的文档结果,尤其是在用户很少去查看第一页之后的结果的情况下。 ● 实际上,在每个用户查询被处理之前,它会被扩展为十分复杂的 lucene 查询。每个索 引过的文档都包含了三个域:网页自身的内容,网页的 URL 文本值,以及由所有关键 (anchor)文本所组成的合成文档,这些关键文本可在导航网页的超链接中找到。每个域对 应一个不同的权重值。Nutch 的查询处理器生成一个 lucene 布尔查询,其中在三个域中 都包含了搜索引擎用户所输入的文本。 ● Nutch 也会特别的把那些在 web 上出现的非常频繁的关键字组作为一个整体来索引(其 中的许多关键字是与 HTTP 相关的词组)。这些关键字序列出现的非常频繁,所以无需花 费精力去对这些词序中的每个组成部分单独搜索,也不必查找出这些搜索结果中是否有 交集的部分。我们不用把这些项划分为单独的单词对来搜索文档,而是把它们作为一个 单元,当然前提是 nutch 在索引期间必须检测到它们是作为一个整体而存在的。另外, 在与索引搜索器交互之前,查询处理器会查找出用户输入的字符串中单词的任意组合。 如果这样一个词序确实存在,它的单词成员就会整合成一个特殊的搜索项。 ● 在使用 lucene 执行索引操作之前,nutch 的内容获取器/索引器需要预处理 HTML 文档。 它使用 NekoHTML 解析器剥离 HTML 中的内容,并索引其中的非标记文本。对于从 HTML 文档提取标题文本,NekoHTML 是很有建树的。 ● Nutch 进程间通信层(IPC)保持了查询处理器与索引搜索器间长时间的连接。查询处理 器端可能存在多个并发的线程,对于给定的地址,任一线程都可以向远程服务器发送调 用请求。服务器每接受一个请求之后,就会根据给定字符串尝试去查找对应的注册服务 (运行在自己线程上)。客户端的请求线程会一直阻塞其他事件,直到服务器端响应的 IPC 代码到达后,消息通知请求线程为止。如果服务器的响应花费的时间超过了 IPC 规定的 时限,IPC 的代码就会宣布此服务器不可用,并抛出一个异常。 ● 另外,nutch 的排序算法是仿照 google 的 PageRank 算法,关于 PageRank 算法的资料 有很多,推荐《Google 的秘密 PageRank 彻底解说中文版》。 www.73cc.com 5 NUTCH 二 在没有学习研究 Nutch 的源代码之前,我认为还是有必要对 Nutch 的工作流程有一个感性 的认识和了解。通过对 Nutch 工作流程的学习认识,先有一个整体的印象,然后可以很好 地指导我们去阅读学习它的源代码,从而更加深入理解 Nutch。 当然,也不是唯一的,在阅读一个框架的源代码的时候,只要你选择了一个好的突破口,然 后按照基于深度遍历的特性来学习理解,也能起到一定效果。但是,这种方式有点像是对着 一个黑盒进行研究,对一个有机体没有一个整体的把握,也就是说没有整体概念,或者整体 概念有点模糊。 Nutch 工作流程 先展示一个相当生动的图片,它描述了 Nutch 的工作流程,如图所示: 其实,只要你沿着图中的数字标号的顺序就可以简单地了解一下 Nutch 的工作流程。这里, 涉及到一些比较重要的目录,比如 crawldb、index、indexes、linkdb、segments,这些目 www.73cc.com 6 录是在你使用 Nutch 抓取程序执行抓取任务的过程中,在文件系统中生成的,在文章 http://hi.baidu.com/shirdrn/blog/item/f92312ef58a260e9ce1b3ef9.html 中执行命令: $ sh ./bin/nutch crawl urls -dir mydir -depth 2 -threads 4 -topN 50 >&./logs/mynutchlog.log 之后,可以在 mydir 目录下面看到上面列举出的 5 个目录,简单介绍如下: crawdb,linkdb 是 web link 目录,存放 url 及 url 的互联关系,作为爬行与重新爬行的依据,页面默认 30 天过期。 segments 是主目录,存放抓回来的网页。页面内容有 bytes[]的 raw content 和 parsed text 的形式。nutch 以广度优 先的原则来爬行,因此每爬完一轮会生成一个 segment 目录。 index 是 lucene 的索引目录,是 indexes 目录里所有 index 合并后的完整索引,注意索引文件只对页面内容进行 索引,没有进行存储,因此查询时要去访问 segments 目录才能获得页面内容。 如果你研究过 Lucene,相信 index 和 indexes 目录中的文件会非常熟悉的,他们是索引文件,使用不同扩 展名的文件来存储不同的内容,比如,.nrm 文件是存储标准化因子信息的,.fnm 文件是存储文件名字信息 的,.prx 文件是存储 Term 的词频信息的,等等。 因为在上面的工作流程图中出现了上面的这些目录,对理解 Nutch 的工作流程很重要。下 面,根据图中的流程将 Nutch 的工作流程分为两个部分进行综述。 以 index 为分隔线,因为第一部分,在左侧,是抓取网页并分析处理;第二部分是基于 index 索引库提供检索功能。 第一部分流程综述: 1.)inject start urls 注入抓取 URL。因为 Nutch 的抓取程序要抓取网页,而定位到某个(或者某些)网页需要指定 一个 URL,在此基础上,Nutch 按照广度遍历策略进行抓取,会根据一开始指定的 URL(可 以是一个 URL 集合:URLs),以此为基础进行抓取工作。 2.)generate segment 生成 segment。Nutch 抓取程序需要抓取到很多的页面,那么具体是哪些页面的?当然,在 互联网上是通过 URL 来定位的。这一步骤主要是对上一步提交的 URL 集合进行分析,确定 抓取任务的详细信息。 fetch list 分析提交的 URL 集合之后,建立一个抓取任务列表。在以后的抓取工作中就可以根据预处 理的此列表进行工作了。 www.73cc.com 7 www 这是通过访问万维网(www),实现抓取工作。 3.)fetch content 开始根据前面生成的抓取任务列表中指定的 URL 对应的页面,这时候开始抓取工作了。 fetched content 需要将抓取到的这些页面文件存放到指定的位置,这些页面文件可以是经过简单预处理以后 而被存储到文件系统中,也可以是原生的网页文件,以备后继流程基于这些文件来进一步处 理,比如分词,建立索引。 content parser 内容解析器。抓取到的页面文件被提交到这里,实现对页面文件的处理,包括页面文件的分 析和处理。 4.)parse content 当然,我们抓取的数据是结构和内容非常复杂的数据,而我们感兴趣的主要是文件的内容, 因为基于关键字检索的搜索引擎的实现,都是根据文本内容来实现的。 parsed text & data 通过 content parser 解析器,最终获取到的就是文本内容和其它一些可能需要用到的数据。 有了这些可以识别的文本内容和数据,就可以基于此来建立索引库,而且需要将本次抓取任 务的详细信息登录到 crawlDB,为下次抓取任务提供有用的信息(比如:避免重复抓取相同 的 URL 指定的页面)。 因此接下来分为两个方向:一个是索引,一个是更新 crawlDB 并继续执行抓取任务: indexing 这是一个索引的过程,对分析处理完成并提交的文本及其数据建立索引,通过索引文件就可 以实现信息的检索功能了。建立索引过程中,由于是基于 Lucene 的,所以用到了 Analyzer 分析器,对预处理的文件进行分析、过滤、分词等等,最后将写入到索引库,供搜索程序工 作使用。 update crawlDB with new extracted urls 根据网页分析处理获取到的信息,更新 crawlDB(爬行数据库),并根据提取到的抓取任务已 经注入的 URLs 循环执行抓取任务。 第二部分流程综述: 这部分比较简单了,就是启动 WEB 服务器,为用户提供检索服务。 www.73cc.com 8 这里,用户可以直接通过输入检索关键字,使用 Lucene 对用户检索关键字进行处理调用 Nutch 的搜索程序,基于索引库中存储的信息,来提取满足用户检索需求的信息。 Nutch 工作流程总结 上面的流程已经分析地非常透彻了,参考一些资料做个总结吧。通过下面总结的工作流程, 理清思路,很有帮助的,如下所示,Nutch 的工作流程描述: 抓取程序工作流程 (1.) 建立初始 URL 集 (2.) 将 URL 集注入 crawldb 数据库---inject 这一步骤,上面的图中没有涉及到。既然需要维护一个 crawlDB,那么在任何时候只要与抓取任务有关的 而且是有用的信息都会被写入 crawlDB 的 (3.) 根据 crawldb 数据库创建抓取列表---generate (4.) 执行抓取,获取网页信息---fetch (5.) 更新数据库,把获取到的页面信息存入数据库中---updatedb (6.) 重复进行 3~5 的步骤,直到预先设定的抓取深度。---这个循环过程被称为“产生/抓取/更新”循环 (7.) 根据 sengments 的内容更新 linkdb 数据库---invertlinks (8.) 建立索引---index 搜索程序工作流程 (1.) 用户通过用户接口进行查询操作 (2.) 将用户查询转化为 lucene 查询 (3.) 从索引库中提取满足用户检索需求的结果集 (4.) 返回结果 www.73cc.com 9 NUTCH 三 之前对 nutch 进行些分析,打算在基础上进行一些应用,不过最近忙着,也没弄 出个所以然,先把阅读心得贴出来,里边可能有不少理解上的错误,仅供参考用, 万一突然有人转载了,请保留 blog 出处 。也希望能认识跟多对此话题感兴 趣的朋友。 主要类分析: 一、org.apache.nutch.crawl.Injector: 1,注入 url.txt 2,url 标准化 3,拦截 url,进行正则校验(regex-urlfilter.txt) 4,对符 URL 标准的 url 进行 map 对构造,在构造过程中 给 CrawlDatum 初始化得分,分数可影响 url host 的搜索排序,和采集优先级! 5,reduce 只做一件事,判断 url 是不是在 crawldb 中已经存在,如果存在 则直接读取原来 CrawlDatum,如果是新 host,则把相应状态存储到里边 (STATUS_DB_UNFETCHED(状态意思为没有采集过)) 二、org.apache.nutch.crawl.Generator: 1,过滤不及格 url (使用 url 过滤插件) 2,检测 URL 是否在有效更新时间里 3,获取 URL metaData,metaData 记录了 url 上次更新时间 4,对 url 进行打分 5,将 url 载入相应任务组(以 host 为分组) 6,计算 url hash 值 7,收集 url, 直至到达 topN 指定量 三、org.apache.nutch.crawl.Fetcher: 1,从 segment 中读取,将它放入相应的队列中,队列以 queueId 为分类,而 queueId 是由 协议://ip 组成,在放入队列过程中, 如果不存在队列则创建(比如 javaeye 的所有地址都属于这个队列: http://221.130.184.141) --> queues.addFetchItem(url, datum); 2,检查机器人协议是否允许该 url 被爬行(robots.txt) --> protocol.getRobotRules(fit.url, fit.datum); www.73cc.com 10 3,检查 url 是否在有效的更新时间里 --> if (rules.getCrawlDelay() > 0) 4,针对不同协议采用不同的协议采用不同机器人,可以是 http、ftp、file, 这地方已经将内容保存下来(Content)。 --> protocol.getProtocolOutput(fit.url, fit.datum); 5,成功取回 Content 后,在次对 HTTP 状态进行识别(如 200、404)。--> case ProtocolStatus.SUCCESS: 6,内容成功保存,进入 ProtocolStatus.SUCCESS 区域,在这区域里,系统 对输出内容进行构造。 --> output(fit.url, fit.datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS); 7,在内容构造过程中,调取内容解析器插件(parseUtil),如 mp3\html\pdf\word\zip\jsp\swf„„ 。 --> this.parseUtil.parse(content); --> parsers[i].getParse(content); 8,我们现在研究 html 解析,所以只简略说明 HtmlParser,HtmlParser 中, 会解析出 text,title, outlinks, metadata。 text:过滤所有 HTML 元素;title:网页标题;outlinks:url 下的所 有链接;metadata:这东西分别做那么几件事情 首先检测 url 头部的 meta name="robots" 看看是否允许蜘蛛爬行, 其次通过对 meta http-equiv refresh 等属性进行识别记录,看页面 是否需要转向。 四、org.apache.nutch.parse.ParseSegment: 1,这个类逻辑就相对简单很多了哦,它对我们也是很有价值的,它只做一 件事情,就是对爬行下来的 Content(原始 HTML)进行解析,具体解析通过插件 来实现。 比如我们要做的数据分析、数据统计都可以在这进行实现。 2,执行完成后,输出三个 Map 对解析内容、 包含所有链接的分析后的结果 、outlinks 五、org.apache.nutch.crawl.CrawlDb: 主要根据 crawld_fatch 输出更新 crawldb。 1,map 对 crawld_fatch、crawldb 地址进行标准化(nomalizer)和拦截操 作(filte); 2,reduce 在对两 crawld_fatch 和 crawldb 进行合并更新。 六、org.apache.nutch.crawl.LinkDb: 这个类的作用是管理新转化进来的链接映射,并列出每个 url 的外部链接 (incoming links)。 1,先是对每一个 url 取出它的 outLinks,作 map 操作把这个 url 作为每个 outLinks 的 incoming link, 2,在 reduce 里把根据每个 key 来把一个 url 的所有 incoming link 都加到 inlinks 里。 3,这样就把每个 url 的外部链接统计出来了,注意,系统对只对外部链接 进行统计,什么叫外部链接呢,就是只对不同 host 进行统计, www.73cc.com 11 记住 javaeye.com 和 biaowen.javaeye.com 是两个不同的 host 哦。 --> boolean ignoreInternalLinks = true; 4,然后一步是对这些新加进来的链接进行合并。 七、org.apache.nutch.crawl.Indexer: 这个类的任务是另一方面的工作了,它是基于 hadoop 和 lucene 的分布式索 引。它就是为前面爬虫抓取回来的数据进行索引好让用户可以搜索到这些数据。 这里的输入就比较多了,有 segments 下的 fetch_dir,parseData 和 parseText,还有 crawldb 下的 current_dir 和 linkdb 下的 current_dir。 1,在这个类里,map 将所有输入都装载到一个容器里边, 2,在到 reduce 进行分类处理, 3,实现拦截 --> this.filters.filter(doc, parse, key, fetchDatum, inlinks); 4,打分 --> this.scfilters.indexerScore(key, doc, dbDatum,fetchDatum, parse, inlinks, boost); 5,当然要把这些数据体组合成一个 lucene 的 document 让它索引了。 6,在 reduce 里组装好后收集时是,最后在输出的 OutputFormat 类里进行真正的索引。 doc 里有如下几个 field content(正文) site (所属主地址) title (标题) host (host) segement (属于哪个 segement) digest (MD5 码,去重时候用到) tstamp (暂时不知道什么东西) url (当前 URL 地址) 载了一个例子: doc = {content=[biaowen - JavaEye 技术网站 首页 新闻 论 坛 博客 招聘 更多 ▼ 问答 „„„„„„ (内容省略)„„„„ biaowen 永 NF/ICP 备 05023328 号], site=[biaowen.javaeye.com], title=[biaowen - JavaEye 技术网站], host=[biaowen.javaeye.com], segment=[20090725083125], digest=[063ba8430fa84e614ce71276e176f4ce], tstamp=[20090725003318265], url=[http://biaowen.javaeye.com/]} 八、org.apache.nutch.crawl.DeleteDuplicates: 这个类的作用就是这它的名字所写的意思--去重。 前面索引后(当然不是一次时的情况)会有重复,所以要去重。为什么呢, 在一次索引时是不重复的,可是多次抓取后就会有重复了。 www.73cc.com 12 就是这个原因才要去重。当然去重的规则有两种一个是以时间为标准,一种 是以内容的 md5 值为标准。 九、org.apache.nutch.indexer.IndexMerger: 这个类就相对简单了,目的将多个 indexes 合并为一个 index,直接调用 lucene 方法实现! NUTCH 四 Nutch-0.9 源代码:Injector 类 出处: http://hi.baidu.com/shirdrn/blog/item/5d24ef2298e3eca24623e887.html 在对 Nutch 抓取工作流程分析中,已经简单地提及到了 inject 操作,如下所示: inject 操作调用的是 nutch 的核心包之一 crawl 包中的类 org.apache.nutch.crawl.Injector。它执行的结果是:crawldb 数据库内容得 到更新,包括 URL 及其状态。 inject 操作主要作用可以从下面 3 方面来说明: (1) 将 URL 集合进行格式化和过滤,消除其中的非法 URL,并设定 URL 状态 (UNFETCHED),按照一定方法进行初始化分值; (2) 将 URL 进行合并,消除重复的 URL 入口; (3) 将 URL 及其状态、分值存入 crawldb 数据库,与原数据库中重复的则删除旧 的,更换新的。 现在,根据上面的信息,可以进一步分析一下: 因为已经初始化了一个 URL 集合,那么这个集合中就存在多个 URL,由此可以想 到,如果由于输入错误可能导致错误的 URL 存在,所以 inject 操作需要对其进 行检查核实,合法的才会去执行抓取,否则执行中发现是错误的会浪费 CPU 这宝 贵资源。 另外,为了防止重复抓取 URL,需要设定一个标志位来标识该 URL 完成抓取与否。 那么这些信息应该被存放到某个地方,以备下次启动抓取工作的时候读取,存放 到哪里呢?当然是 CrawlDB 了,更新已经初始化的 CrawlDB 实体的实例信息,对 应于文件系统中的 crawldb 目录。 www.73cc.com 13 再考虑,如果本次抓取工作完成了,下次要启动了,同时也对应一个初始化的 URL 集合,那么这里面出现的 URL 可能在上次被抓取过,是否被抓取过,可以从 CrawlDB 中查看到详细的信息。对于重复的 URL 当然不希望再次被抓取(如果该 URL 对应的页面信息没有完全变更),应该忽略掉,这就涉及到了对抓取的页面 去除重复的 URL,这是应该做的,这也可以称为合并操作。 如果你了解 MapReduce 模型的话,现在已经能够想到,这里面可以实现 MapReduce 模型的,Nutch 就实现了 MapReduce 模型(实际上是在 Hadoop 中实现的,因为对 于每个 Map 和 Reduce 实现类都分别实现了 org.apache.hadoop.mapred.Mapper 接口与 org.apache.hadoop.mapred.Reducer)。 Mapper 实现对 URL 集合数据的映射,Reducer 实现了对 URL 的合并操作。这里提 及 MapReduce 模型,有助于对 Injector 类的两个静态内部类 InjectMapper 和 InjectReducer 理解。 org.apache.nutch.crawl.Injector 类实现了 org.apache.hadoop.util.ToolBase 抽象类,如下所示: public class Injector extends ToolBase 而 org.apache.hadoop.util.ToolBase 抽象类又实现了 org.apache.hadoop.util.Tool 接口。如果你对 Tool 类了解,及其配置部署过 Hadoop 自带的 WordCount 工具的时候,就理解了,实现 Tool 接口的实现类可以 通过 org.apache.hadoop.util.ToolRunner 类来启动执行 MapReduce 任务的工 具。 先看 Injector 类中如何实现 Map 的,InjectMapper 类的实现如下所示: Java 代码 1. /** 标准化初始化的 URLs,并且过滤注入的 URLs */ 2. public static class InjectMapper implements Mapper { // 实现 Mapper 接口,就要实现该 接口中定义的 map 函数 3. private URLNormalizers urlNormalizers; // URL 标准化工具,可以实现 URL 的标准 化 4. private float interval; // 设置抓取间隔时间 5. private float scoreInjected; // 设置注入 URL 对应页面的得分值 6. private JobConf jobConf; // 抓取工作配置实例 7. private URLFilters filters; // URL 过滤器 8. private ScoringFilters scfilters; // 得分过滤器 9. private long curTime; // 设置注入时间 10. 11. public void configure(JobConf job) { // 为一次抓取工作进行配置 www.73cc.com 14 12. this.jobConf = job; 13. urlNormalizers = new URLNormalizers(job, URLNormalizers.SCOPE_INJECT); 14. interval = jobConf.getFloat("db.default.fetch.interval", 30f); 15. filters = new URLFilters(jobConf); 16. scfilters = new ScoringFilters(jobConf); 17. scoreInjected = jobConf.getFloat("db.score.injected", 1.0f); 18. curTime = job.getLong("injector.current.time", System.currentTimeMillis()); 19. } 20. 21. public void close() {} 22. 23. public void map(WritableComparable key, Writable val, 24. OutputCollector output, Reporter reporter) 25. throws IOException { // map 函数的实现是核心 26. Text value = (Text)val; 27. String url = value.toString(); // 从初始化 URL 集合中读取一行,一行是一 个 URL 28. // System.out.println("url: " +url); 29. try { 30. url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT); // 标准 化 URL 31. url = filters.filter(url); // 过滤 URL,去除不合法的 URL 32. } catch (Exception e) { 33. if (LOG.isWarnEnabled()) { LOG.warn("Skipping " +url+":"+e); } 34. url = null; 35. } 36. if (url != null) { // 如果合法,则解析该 URL 37. value.set(url); // 将合法的 URL 收集到 Text value 对象中 38. 39. // 其中,org.apache.nutch.crawl.CrawlDatum 类中定义了 URL 的各种可以设置的 状态,可以在该类的对象中设置与 URL 相关的有用的信息,比如注入状态、抓取间 隔时间,抓取时间、得分等等 40. 41. CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_INJECTED, interval ); 42. datum.setFetchTime(curTime); 43. datum.setScore(scoreInjected); 44. try { 45. scfilters.injectedScore(value, datum); 46. } catch (ScoringFilterException e) { 47. if (LOG.isWarnEnabled()) { 48. LOG.warn("Cannot filter injected score for url " + url + 49. ", using default (" + e.getMessage() + ")"); 50. } www.73cc.com 15 51. datum.setScore(scoreInjected); 52. } 53. output.collect(value, datum); // 收集 key/value 对,并输出结果 54. } 55. } 56. } 从上面 InjectMapper 类的实现可以看出,其中包含的 key 应该是 URL 本身,而 value 则是与注入的 URL 对应的信息,比如 URL 当前状态信息(是否已经抓取过, 或者不需要抓取)等等。 接着再看 InjectReducer 类,它实现了 Reduce 操作,代码如下所示: Java 代码 1. /** 为一个 URL 合并多个新的入口. */ 2. blic static class InjectReducer implements Reducer { 3. public void configure(JobConf job) {} 4. public void close() {} 5. 6. public void reduce(WritableComparable key, Iterator values, 7. OutputCollector output, Reporter reporter) 8. throws IOException { // reduce 函数的实现也是核心的 9. CrawlDatum old = null; 10. CrawlDatum injected = null; 11. while (values.hasNext()) { // 根据 Map 任务映射得到的迭代器,进行遍历得到的中 间结果 12. CrawlDatum val = (CrawlDatum)values.next(); 13. if (val.getStatus() == CrawlDatum.STATUS_INJECTED) { // 如果某个 URL 已经注 入到 CrawlDB 14. injected = val; 15. injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); // 则设置这个 URL 对应的页面不用进行抓取 16. } else { 17. old = val; // 否则如果没有注入过,则需要对该 URL 对应的页面进行抓取 18. } 19. } 20. CrawlDatum res = null; 21. if (old != null) res = old; // 不要重写已经存在的 value 22. else res = injected; 23. 24. output.collect(key, res); // 收集 key/value 对;合并的最终结果是,使得将要注入到 CrawlDB 中 URL 没有重复的 25. } www.73cc.com 16 实现注入的操作是 Injector 类的 inject()方法中,如下所示: Java 代码 1. public void inject(Path crawlDb, Path urlDir) throws IOException { 2. 3. if (LOG.isInfoEnabled()) { 4. LOG.info("Injector: starting"); 5. LOG.info("Injector: crawlDb: " + crawlDb); 6. LOG.info("Injector: urlDir: " + urlDir); 7. } 8. 9. Path tempDir = 10. new Path(getConf().get("mapred.temp.dir", ".") + 11. "/inject-temp-"+ 12. Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); // 临时目录用 来存放 MapReduce 工作中生成的中间结果数据的 13. 14. // map text input file to a file 15. if (LOG.isInfoEnabled()) { 16. LOG.info("Injector: Converting injected urls to crawl db entries."); 17. } 18. JobConf sortJob = new NutchJob(getConf()); 19. sortJob.setJobName("inject " + urlDir); 20. sortJob.setInputPath(urlDir); 21. sortJob.setMapperClass(InjectMapper.class); 22. 23. sortJob.setOutputPath(tempDir); 24. sortJob.setOutputFormat(SequenceFileOutputFormat.class); 25. sortJob.setOutputKeyClass(Text.class); 26. sortJob.setOutputValueClass(CrawlDatum.class); 27. sortJob.setLong("injector.current.time", System.currentTimeMillis()); 28. JobClient.runJob(sortJob); 29. 30. // merge with existing crawl db 31. if (LOG.isInfoEnabled()) { 32. LOG.info("Injector: Merging injected urls into crawl db."); 33. } 34. JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb); 35. mergeJob.addInputPath(tempDir); 36. mergeJob.setReducerClass(InjectReducer.class); 37. JobClient.runJob(mergeJob); 38. CrawlDb.install(mergeJob, crawlDb); www.73cc.com 17 39. 40. // 删除临时文件 41. FileSystem fs = new JobClient(getConf()).getFs(); 42. fs.delete(tempDir); 43. if (LOG.isInfoEnabled()) { LOG.info("Injector: done"); } 该方法比较容易理解,在这里安装 CrawlDB 了,从而真正地将 URLs 注入到 CrawlDB 中了,更新 crawldb 目录中的数据。 要想执行 inject 操作,需要启动,这是在 Injector 类中的 run()方法中启动的, 如下所示: Java 代码 1. public int run(String[] args) throws Exception { 2. if (args.length < 2) { // 根据命令行进行启动 3. System.err.println("Usage: Injector "); 4. return -1; 5. } 6. try { 7. inject(new Path(args[0]), new Path(args[1])); // 调用 inject()方法 8. return 0; 9. } catch (Exception e) { 10. LOG.fatal("Injector: " + StringUtils.stringifyException(e)); 11. return -1; 12. } 13. } 最后,就是在 main 主函数中: Java 代码 1. public static void main(String[] args) throws Exception { 2. int res = new Injector().doMain(NutchConfiguration.create(), args); 3. System.exit(res); public static void main(String[] args) throws Exception { 其中,doMain()方法是在 org.apache.hadoop.util.ToolBase 抽象类中实现的, 就是通过 ToolRunner 工具启动工作,如下所示: Java 代码 1. public final int doMain(Configuration conf, String[] args) throws Exception { www.73cc.com 18 2. return ToolRunner.run(conf, this, args); 3. } 启动 inject 操作,实际所做的工作就是对 URL 进行预处理,检查每个初始化 URL 的合法性,从而更新到 CrawlDB 中。待 inject 操作完成之后,就可以执行 后继操作了——应该是生成抓取列表(generate 操作)。 NUTCH 五 Nutch-0.9 中,org.apache.nutch.crawl.Crawl 类中提供了一个入口主函数 main,通过接收键入的命令行,根据命令行指定的参数对 Nutch 进行配置,从而 启动 Nutch 抓取程序,通过阅读 org.apache.nutch.crawl.Crawl 类的源代码来 了解 Nutch 是如何根据接收的命令行进行配置及其启动的。 org.apache.nutch.crawl.Crawl 类的主函数如下所示: Java 代码 1. // 应该知道,Nutch 查找文件系统是基于 Linux 系统的机制的,所以提 供启动的命令与 Linux 的 Shell 命令很相似。 2. 3. public static void main(String args[]) throws Exception { 4. if (args.length < 1) { // 检查命令行参数是否合法,如果小于 1 个参数,则给出提示。 5. System.out.println 6. ("Usage: Crawl [-dir d] [-threads n] [-depth i ] [-topN N]"); 7. return; 8. } 9. 10. Configuration conf = NutchConfiguration.create(); // 使用静 态类 NutchConfiguration 创建一个 org.apache.hadoop.conf.Configuration 实例,可以在 Hadoop 的源代码 中查看到该类的定义(读取 hadoop-site.xml 配置文件) 11. conf.addDefaultResource("crawl-tool.xml"); // 读取并设置抓 取工具的配置文件,可以在 nutch-0.9\conf 目录下找到 crawl-tool.xml 文件 12. JobConf job = new NutchJob(conf); // 抓取任务配置实例的创 建 13. 14. Path rootUrlDir = null; // 初始 URLs 文件所在的目录,使用 Hadoop 的 org.apache.hadoop.fs.Path 类创建目录 www.73cc.com 19 15. Path dir = new Path("crawl-" + getDate()); // 设置默认抓取 到网页的存放目录。如果命令行中没有指定-dir 的值就会使用默认的值: crawl-时间。 16. int threads = job.getInt("fetcher.threads.fetch", 10); // 设置默认抓取工作启动线程数目,默认值为 10。 17. int depth = 5; // 默认抓取工作遍历深度,默认值为 5。 18. int topN = Integer.MAX_VALUE; // 抓取任务抓取网页的数量,默 认为最大值。 19. 20. for (int i = 0; i < args.length; i++) { // 根据读取的命令 行,设置抓取工作配置信息。 21. if ("-dir".equals(args[i])) { 22. dir = new Path(args[i+1]); 23. i++; 24. } else if ("-threads".equals(args[i])) { 25. threads = Integer.parseInt(args[i+1]); 26. i++; 27. } else if ("-depth".equals(args[i])) { 28. depth = Integer.parseInt(args[i+1]); 29. i++; 30. } else if ("-topN".equals(args[i])) { 31. topN = Integer.parseInt(args[i+1]); 32. i++; 33. } else if (args[i] != null) { 34. rootUrlDir = new Path(args[i]); 35. } 36. } 37. 38. FileSystem fs = FileSystem.get(job); // 根据抓取工作配置 JobConf 创建一个用来存放抓取到的网页的目录。 39. if (fs.exists(dir)) { 40. throw new RuntimeException(dir + " already exists."); // 如果该目录已经存在,则发生运行时异常。 41. } 42. 43. if (LOG.isInfoEnabled()) { // 登录日志信息 44. LOG.info("crawl started in: " + dir); 45. LOG.info("rootUrlDir = " + rootUrlDir); 46. LOG.info("threads = " + threads); 47. LOG.info("depth = " + depth); 48. if (topN != Integer.MAX_VALUE) 49. LOG.info("topN = " + topN); 50. } 51. www.73cc.com 20 52. // 在目录 dir 下面创建下面 5 个目录,用来存放,抓取工作过程中 不同操作生成的文件或者目录。 53. 54. Path crawlDb = new Path(dir + "/crawldb"); 55. Path linkDb = new Path(dir + "/linkdb"); 56. Path segments = new Path(dir + "/segments"); 57. Path indexes = new Path(dir + "/indexes"); 58. Path index = new Path(dir + "/index"); 59. 60. Path tmpDir = job.getLocalPath("crawl"+Path.SEPARATOR+getDa te()); 61. Injector injector = new Injector(conf); // 根据 Configuration conf 创建一个 Injector 实例 62. Generator generator = new Generator(conf); // 根据 Configuration conf 创建一个 Generator 实例 63. Fetcher fetcher = new Fetcher(conf); // 根据 Configuration conf 创建一个 Fetcher 实例 64. ParseSegment parseSegment = new ParseSegment(conf); // 根据 Configuration conf 创建一个 ParseSegment 实例 65. CrawlDb crawlDbTool = new CrawlDb(conf); // 根据 Configuration conf 创建一个 CrawlDb 实例 66. LinkDb linkDbTool = new LinkDb(conf); // 根据 Configuration conf 创建一个 LinkDb 实例 67. Indexer indexer = new Indexer(conf); // 根据 Configuration conf 创建一个 Indexer 实例 68. DeleteDuplicates dedup = new DeleteDuplicates(conf); // 根 据 Configuration conf 创建一个 DeleteDuplicates 实例 69. IndexMerger merger = new IndexMerger(conf); // 根据 Configuration conf 创建一个 IndexMerger 实例 70. 71. // 初始化 crawlDb 72. injector.inject(crawlDb, rootUrlDir); // 从 rootUrlDir 目录 中读取初始化 URLs,将 URLs 注入到 CrawlDb 实体中去 73. int i; 74. for (i = 0; i < depth; i++) { // 在 segment 文件 中生成抓取工作列表 75. Path segment = generator.generate(crawlDb, segments, -1, topN, System 76. .currentTimeMillis(), false, false); 77. if (segment == null) { 78. LOG.info("Stopping at depth=" + i + " - no more URLs to fetch."); 79. break; 80. } www.73cc.com 21 81. fetcher.fetch(segment, threads); // 根据配置的线程数开始 抓取网页文件 82. if (!Fetcher.isParsing(job)) { 83. parseSegment.parse(segment); // 解析网页文件 84. } 85. crawlDbTool.update(crawlDb, new Path[]{segment}, true, tr ue); // 更新 CrawlDb 86. } 87. if (i > 0) { 88. linkDbTool.invert(linkDb, segments, true, true, false); / / invert links 89. 90. indexer.index(indexes, crawlDb, linkDb, fs.listPaths(segmen ts)); // 索引过程 91. dedup.dedup(new Path[] { indexes }); // 复制索引文件 92. merger.merge(fs.listPaths(indexes), index, tmpDir); // 将 索引目录 index 中的索引文件合并后写入到 indexes 目录中 93. } else { 94. LOG.warn("No URLs to fetch - check your seed list and URL filters."); 95. } 96. if (LOG.isInfoEnabled()) { LOG.info("crawl finished: " + di r); } 97.} 通过上面的源代码的整体分析,总结一下具体都在这里做了哪些工作: 1、读取命令行参数,合法以后才继续初始化配置实例; 2、通过读取 hadoop-site.xml 配置文件,初始化一个 Configuration 实例,并 根据 crawl-tool.xml 文件内容设置抓取工作配置; 3、设置一些默认抓取工作参数,如果命令行中没有指定就会按照默认的参数值 进行抓取工作的执行,比如,抓取工作抓取到的网页文件存放目录 rootUrlDir、 启动的抓取工作进程数 threads、抓取深度 depth、抓取网页数量 topN; 4、创建抓取工作抓取到的网页文件的存放目录,及其子目录(crawldb、linkdb、 segments、indexes、index),这些子目录有的是用来存放原生网页,有的是预 处理网页文件,有的是网页解析出的文本内容及其其它相关数据等等; 5、在抓取工作及其索引过程中,要进行很多操作来对网页文件或数据进行处理, 这通过初始化一些实现了这些操作的类的实例来完成的,例如:Injector、 www.73cc.com 22 Generator、Fetcher、ParseSegment、CrawlDb、LinkDb、Indexer、 DeleteDuplicates、IndexMerger。这些类中,有的是作为实体类,像 CrawlDb、 LinkDb,它们需要在抓取工作执行过程中及时更新,保持数据状态的处于良好正 确状态。 6、最后,就开始执行相关操作了,包括初始化 CrawlDb、生成抓取工作列表、 抓取网页文件、更新 CrawlDb、倒排 Links、建立索引、复制索引文件、合并索 引文件。 NUTCH 六 Nutch-0.9 源代码:Injector 类 出处: http://hi.baidu.com/shirdrn/blog/item/5d24ef2298e3eca24623e887.html 在对 Nutch 抓取工作流程分析中,已经简单地提及到了 inject 操作,如下所示: inject 操作调用的是 nutch 的核心包之一 crawl 包中的类 org.apache.nutch.crawl.Injector。它执行的结果是:crawldb 数据库内容得 到更新,包括 URL 及其状态。 inject 操作主要作用可以从下面 3 方面来说明: (1) 将 URL 集合进行格式化和过滤,消除其中的非法 URL,并设定 URL 状态 (UNFETCHED),按照一定方法进行初始化分值; (2) 将 URL 进行合并,消除重复的 URL 入口; (3) 将 URL 及其状态、分值存入 crawldb 数据库,与原数据库中重复的则删除旧 的,更换新的。 现在,根据上面的信息,可以进一步分析一下: 因为已经初始化了一个 URL 集合,那么这个集合中就存在多个 URL,由此可以想 到,如果由于输入错误可能导致错误的 URL 存在,所以 inject 操作需要对其进 行检查核实,合法的才会去执行抓取,否则执行中发现是错误的会浪费 CPU 这宝 贵资源。 另外,为了防止重复抓取 URL,需要设定一个标志位来标识该 URL 完成抓取与否。 www.73cc.com 23 那么这些信息应该被存放到某个地方,以备下次启动抓取工作的时候读取,存放 到哪里呢?当然是 CrawlDB 了,更新已经初始化的 CrawlDB 实体的实例信息,对 应于文件系统中的 crawldb 目录。 再考虑,如果本次抓取工作完成了,下次要启动了,同时也对应一个初始化的 URL 集合,那么这里面出现的 URL 可能在上次被抓取过,是否被抓取过,可以从 CrawlDB 中查看到详细的信息。对于重复的 URL 当然不希望再次被抓取(如果该 URL 对应的页面信息没有完全变更),应该忽略掉,这就涉及到了对抓取的页面 去除重复的 URL,这是应该做的,这也可以称为合并操作。 如果你了解 MapReduce 模型的话,现在已经能够想到,这里面可以实现 MapReduce 模型的,Nutch 就实现了 MapReduce 模型(实际上是在 Hadoop 中实现的,因为对 于每个 Map 和 Reduce 实现类都分别实现了 org.apache.hadoop.mapred.Mapper 接口与 org.apache.hadoop.mapred.Reducer)。 Mapper 实现对 URL 集合数据的映射,Reducer 实现了对 URL 的合并操作。这里提 及 MapReduce 模型,有助于对 Injector 类的两个静态内部类 InjectMapper 和 InjectReducer 理解。 org.apache.nutch.crawl.Injector 类实现了 org.apache.hadoop.util.ToolBase 抽象类,如下所示: public class Injector extends ToolBase 而 org.apache.hadoop.util.ToolBase 抽象类又实现了 org.apache.hadoop.util.Tool 接口。如果你对 Tool 类了解,及其配置部署过 Hadoop 自带的 WordCount 工具的时候,就理解了,实现 Tool 接口的实现类可以 通过 org.apache.hadoop.util.ToolRunner 类来启动执行 MapReduce 任务的工 具。 先看 Injector 类中如何实现 Map 的,InjectMapper 类的实现如下所示: Java 代码 1. /** 标准化初始化的 URLs,并且过滤注入的 URLs */ 2. public static class InjectMapper implements Mapper { // 实现 Mapper 接口,就要实现该 接口中定义的 map 函数 3. private URLNormalizers urlNormalizers; // URL 标准化工具,可以实现 URL 的标准 化 4. private float interval; // 设置抓取间隔时间 www.73cc.com 24 5. private float scoreInjected; // 设置注入 URL 对应页面的得分值 6. private JobConf jobConf; // 抓取工作配置实例 7. private URLFilters filters; // URL 过滤器 8. private ScoringFilters scfilters; // 得分过滤器 9. private long curTime; // 设置注入时间 10. 11. public void configure(JobConf job) { // 为一次抓取工作进行配置 12. this.jobConf = job; 13. urlNormalizers = new URLNormalizers(job, URLNormalizers.SCOPE_INJECT); 14. interval = jobConf.getFloat("db.default.fetch.interval", 30f); 15. filters = new URLFilters(jobConf); 16. scfilters = new ScoringFilters(jobConf); 17. scoreInjected = jobConf.getFloat("db.score.injected", 1.0f); 18. curTime = job.getLong("injector.current.time", System.currentTimeMillis()); 19. } 20. 21. public void close() {} 22. 23. public void map(WritableComparable key, Writable val, 24. OutputCollector output, Reporter reporter) 25. throws IOException { // map 函数的实现是核心 26. Text value = (Text)val; 27. String url = value.toString(); // 从初始化 URL 集合中读取一行,一行是一 个 URL 28. // System.out.println("url: " +url); 29. try { 30. url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT); // 标准 化 URL 31. url = filters.filter(url); // 过滤 URL,去除不合法的 URL 32. } catch (Exception e) { 33. if (LOG.isWarnEnabled()) { LOG.warn("Skipping " +url+":"+e); } 34. url = null; 35. } 36. if (url != null) { // 如果合法,则解析该 URL 37. value.set(url); // 将合法的 URL 收集到 Text value 对象中 38. 39. // 其中,org.apache.nutch.crawl.CrawlDatum 类中定义了 URL 的各种可以设置的 状态,可以在该类的对象中设置与 URL 相关的有用的信息,比如注入状态、抓取间 隔时间,抓取时间、得分等等 40. 41. CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_INJECTED, interval ); 42. datum.setFetchTime(curTime); 43. datum.setScore(scoreInjected); www.73cc.com 25 44. try { 45. scfilters.injectedScore(value, datum); 46. } catch (ScoringFilterException e) { 47. if (LOG.isWarnEnabled()) { 48. LOG.warn("Cannot filter injected score for url " + url + 49. ", using default (" + e.getMessage() + ")"); 50. } 51. datum.setScore(scoreInjected); 52. } 53. output.collect(value, datum); // 收集 key/value 对,并输出结果 54. } 55. } 56. } 从上面 InjectMapper 类的实现可以看出,其中包含的 key 应该是 URL 本身,而 value 则是与注入的 URL 对应的信息,比如 URL 当前状态信息(是否已经抓取过, 或者不需要抓取)等等。 接着再看 InjectReducer 类,它实现了 Reduce 操作,代码如下所示: Java 代码 1. /** 为一个 URL 合并多个新的入口. */ 2. blic static class InjectReducer implements Reducer { 3. public void configure(JobConf job) {} 4. public void close() {} 5. 6. public void reduce(WritableComparable key, Iterator values, 7. OutputCollector output, Reporter reporter) 8. throws IOException { // reduce 函数的实现也是核心的 9. CrawlDatum old = null; 10. CrawlDatum injected = null; 11. while (values.hasNext()) { // 根据 Map 任务映射得到的迭代器,进行遍历得到的中 间结果 12. CrawlDatum val = (CrawlDatum)values.next(); 13. if (val.getStatus() == CrawlDatum.STATUS_INJECTED) { // 如果某个 URL 已经注 入到 CrawlDB 14. injected = val; 15. injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); // 则设置这个 URL 对应的页面不用进行抓取 16. } else { 17. old = val; // 否则如果没有注入过,则需要对该 URL 对应的页面进行抓取 18. } 19. } www.73cc.com 26 20. CrawlDatum res = null; 21. if (old != null) res = old; // 不要重写已经存在的 value 22. else res = injected; 23. 24. output.collect(key, res); // 收集 key/value 对;合并的最终结果是,使得将要注入到 CrawlDB 中 URL 没有重复的 25. } 实现注入的操作是 Injector 类的 inject()方法中,如下所示: Java 代码 1. public void inject(Path crawlDb, Path urlDir) throws IOException { 2. 3. if (LOG.isInfoEnabled()) { 4. LOG.info("Injector: starting"); 5. LOG.info("Injector: crawlDb: " + crawlDb); 6. LOG.info("Injector: urlDir: " + urlDir); 7. } 8. 9. Path tempDir = 10. new Path(getConf().get("mapred.temp.dir", ".") + 11. "/inject-temp-"+ 12. Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); // 临时目录用 来存放 MapReduce 工作中生成的中间结果数据的 13. 14. // map text input file to a file 15. if (LOG.isInfoEnabled()) { 16. LOG.info("Injector: Converting injected urls to crawl db entries."); 17. } 18. JobConf sortJob = new NutchJob(getConf()); 19. sortJob.setJobName("inject " + urlDir); 20. sortJob.setInputPath(urlDir); 21. sortJob.setMapperClass(InjectMapper.class); 22. 23. sortJob.setOutputPath(tempDir); 24. sortJob.setOutputFormat(SequenceFileOutputFormat.class); 25. sortJob.setOutputKeyClass(Text.class); 26. sortJob.setOutputValueClass(CrawlDatum.class); 27. sortJob.setLong("injector.current.time", System.currentTimeMillis()); 28. JobClient.runJob(sortJob); 29. 30. // merge with existing crawl db www.73cc.com 27 31. if (LOG.isInfoEnabled()) { 32. LOG.info("Injector: Merging injected urls into crawl db."); 33. } 34. JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb); 35. mergeJob.addInputPath(tempDir); 36. mergeJob.setReducerClass(InjectReducer.class); 37. JobClient.runJob(mergeJob); 38. CrawlDb.install(mergeJob, crawlDb); 39. 40. // 删除临时文件 41. FileSystem fs = new JobClient(getConf()).getFs(); 42. fs.delete(tempDir); 43. if (LOG.isInfoEnabled()) { LOG.info("Injector: done"); } 该方法比较容易理解,在这里安装 CrawlDB 了,从而真正地将 URLs 注入到 CrawlDB 中了,更新 crawldb 目录中的数据。 要想执行 inject 操作,需要启动,这是在 Injector 类中的 run()方法中启动的, 如下所示: Java 代码 1. public int run(String[] args) throws Exception { 2. if (args.length < 2) { // 根据命令行进行启动 3. System.err.println("Usage: Injector "); 4. return -1; 5. } 6. try { 7. inject(new Path(args[0]), new Path(args[1])); // 调用 inject()方法 8. return 0; 9. } catch (Exception e) { 10. LOG.fatal("Injector: " + StringUtils.stringifyException(e)); 11. return -1; 12. } 13. } 最后,就是在 main 主函数中: Java 代码 1. public static void main(String[] args) throws Exception { 2. int res = new Injector().doMain(NutchConfiguration.create(), args); 3. System.exit(res); www.73cc.com 28 其中,doMain()方法是在 org.apache.hadoop.util.ToolBase 抽象类中实现的, 就是通过 ToolRunner 工具启动工作,如下所示: Java 代码 1. public final int doMain(Configuration conf, String[] args) throws Exception { 2. return ToolRunner.run(conf, this, args); 3. } 启动 inject 操作,实际所做的工作就是对 URL 进行预处理,检查每个初始化 URL 的合法性,从而更新到 CrawlDB 中。待 inject 操作完成之后,就可以执行 后继操作了——应该是生成抓取列表(generate 操作)。 NUTCH 七 Nutch 中 MapReduce 的分析 出处: http://www.hadoop.org.cn/mapreduce/nutch-mapreduce/ Nutch 是最早用 MapReduce 的项目 (Hadoop 其实原来是 Nutch 的一部分),Nutch 的 plugin 机制吸取了 eclipse 的 plugin 设计思路。在 Nutch 中 MapReduce 编 程方式占据了其核心的结构大部分。从插入 url 列表(Inject),生成抓取列表 (Generate),抓取内容(Fetch), 分析处理内容(Parse),更新 Crawl DB 库(Update ),转化链接(Invert Links)一直到建立索引(Index)都是采 用 MapReduce 来完成的。查看 Nutch 的源代码我们能够学到更多的 如何用 MapReduce 来处理我们编程中所遇到的问题。 Nutch 从获取下载列表到建立索引的过程: 插入 url 列表到 Crawl DB,引导下面的抓取程序 循环: – 从 Crawl DB 生成一些 url 列表; – 抓取内容; – 分析处理抓取的内容; – 更新 Crawl DB 库. 转化每个页面中外部对它的链接 建立索引 www.73cc.com 29 具体技术实现细节: 1。插入 url 列表(Inject) MapReduce 程序 1: 目标:转换 input 输入为 CrawlDatum 格式. 输入: url 文件 Map(line) → Reduce()合并多重的 Url. 输出:临时的 CrawlDatum 文件. MapReduce2: 目标:合并上一步产生的临时文件到新的 DB 输入: 上次 MapReduce 输出的 CrawlDatum Map()过滤重复的 url. Reduce: 合并两个 CrawlDatum 到一个新的 DB 输出:CrawlDatum 2。生成抓取列表(Generate) MapReduce 程序 1: 目标:选择抓取列表 输入: Crawl DB 文件 Map() → 如果抓取当前时间大于现在时间 ,抓换成 格式. 分发器(Partition) :用 url 的 host 保证同一个站点分发到同 一个 Reduce 程序上. Reduce:取最顶部的 N 个链接. MapReduce 程序 2: 目标:准备抓取 Map() 抓换成 格式 分发器(Partition) :用 url 的 host 输出:文件 3。抓取内容(Fetch) MapReduce: 目标:抓取内容 输入: , 按 host 划分, 按 hash 排序 Map(url,CrawlDatum) → 输出 多线程, 调用 Nutch 的抓取协议插件,抓取输出 www.73cc.com 30 输出: , 两个文件 4。分析处理内容(Parse) MapReduce: 目标:处理抓取的能容 输入: 抓取的 Map(url, Content) → 调用 Nutch 的解析插件,输出处理完的格式是 输出: , . 5。更新 Crawl DB 库(Update ) MapReduce: 目标: 整合 fetch 和 parse 到 DB 中 输入: 现有的 db 加上 fetch 和 parse 的输 出,合并上面 3 个 DB 为一个新的 DB 输出: 新的抓取 DB 6。转化链接(Invert Links) MapReduce: 目标:统计外部页面对本页面链接 输入: , 包含页面往外的链接 Map(srcUrl, ParseData> → 搜集外部对本页面的链接 Inlinks 格式: Reduce() 添加 inlinks 输出: 7。建立索引(Index) MapReduce: 目标:生成 Lucene 索引 输入: 多种文件格式 parse 处理完的 提取 title, metadata 信息 等 parse 处理完的 提取 text 内容 www.73cc.com 31 转换链接处理完的 提取 anchors 抓取内容处理完的 提取抓取时间. Map() 用 ObjectWritable 包裹上面的内容 Reduce() 调用 Nutch 的索引插件,生成 Lucene Document 文档 输出: 输出 Lucene 索引 NUTCH 八 一):Nutch 的工作流程: Crawdb、linkdb 是 web link 目录,存放 url 及 url 的互联关系,作为爬行与重新爬行的依据。 segments 是主目录,存放抓回来的网页。页面内容有 bytes[]的 raw content 和 parsed text 的形式。nutch 以广度优先的原则来爬行,因此每爬完一轮会生成一个 segment 目录。 index 是 lucene 的索引目录,是 indexes 目录里所有 index 合并后的完整索引,注意索 引文件只对页面内容进行索引,没有进行存储,因此查询时要去访问 segments 目录才能获得页面内容。 (一):流程综述: 【1】:inject start urls 注入抓取 URL。因为 Nutch 的抓取程序是要抓取网页,而定位到某个网页需要 制定一个 URL,所以 Nutch 按照广度遍历策略进行抓取,会根据一开始指定的 URL,也可以是一个 URLS 集合,以此基础进行抓取工作。 【2】:generate segment 生成 segment。Nutch 抓取程序需要抓取到很多的页面,那么具体是哪些页面的? 当然,在互联网上是通过 URL 来定位的。这一步骤主要是对上一步提交的 URL 集合进行分析,确定抓取任务的详细信息。 【3】fetch list 分析提交的 URL 集合之后,建立一个抓取任务列表。在以后的抓取工作中就可 以根据预处理的此列表进行工作了。 【4】:www www.73cc.com 32 这是通过访问万维网(www),实现抓取工作。 【5】:fetch content 开始根据前面生成的抓取任务列表中指定的 URL 对应的页面,这时候开始抓取 工作了。 【6】:fetched content 需要将抓取到的这些页面文件存放到指定的位置,这些页面文件可以是经过简单 预处理以后而被存储到文件系统中,也可以是原生的网页文件,以备后继流程基 于这些文件来进一步处理,比如分词,建立索引。 content parser 内容解析器。抓取到的页面文件被提交到这里,实现对页面文件的处理,包括页 面文件的分析和处理。 【7】:parse content 当然,我们抓取的数据是结构和内容非常复杂的数据,而我们感兴趣的主要是文 件的内容,因为基于关键字检索的搜索引擎的实现,都是根据文本内容来实现的。 【8】:parsed text & data 通过 content parser 解析器,最终获取到的就是文本内容和其它一些可能需要用 到的数据。有了这些可以识别的文本内容和数据,就可以基于此来建立索引库, 而且需要将本次抓取任务的详细信息登录到 crawlDB,为下次抓取任务提供有用 的信息(比如:避免重复抓取相同的 URL 指定的页面)。 因此接下来分为两个方向:一个是索引,一个是更新 crawlDB 并继续执行抓取 任务: 【9】:indexing 这是一个索引的过程,对分析处理完成并提交的文本及其数据建立索引,通过索 引文件就可以实现信息的检索功能了。建立索引过程中,由于是基于 Lucene 的, 所以用到了 Analyzer 分析器,对预处理的文件进行分析、过滤、分词等等,最 后将写入到索引库,供搜索程序工作使用。 【10】:update crawlDB with new extracted urls 根据网页分析处理获取到的信息,更新 crawlDB(爬行数据库),并根据提取到的 抓取任务已经注入的 URLs 循环执行抓取任务。 (二):Nutch 工作流程总结 上面的流程已经分析地非常透彻了,参考一些资料做个总结吧。通过下面总结的 工作流程,理清思路,很有帮助的,如下所示,Nutch 的工作流程描述: 抓取程序工作流程 (1.) 建立初始 URL 集 (2.) 将 URL 集注入 crawldb 数据库---inject 这一步骤,上面的图中没有涉及到。既然需要维护一个 crawlDB,那么在任何时 候只要与抓取任务有关的而且是有用的信息都会被写入 crawlDB 的 (3.) 根据 crawldb 数据库创建抓取列表---generate (4.) 执行抓取,获取网页信息---fetch (5.) 更新数据库,把获取到的页面信息存入数据库中---updatedb (6.) 重复进行 3~5 的步骤,直到预先设定的抓取深度。---这个循环过程被称为 “产生/抓取/更新”循环 (7.) 根据 sengments 的内容更新 linkdb 数据库---invertlinks www.73cc.com 33 (8.) 建立索引---index 搜索程序工作流程 (1.) 用户通过用户接口进行查询操作 (2.) 将用户查询转化为 lucene 查询 (3.) 从索引库中提取满足用户检索需求的结果集 (4.) 返回结果 NUTCH 九 nutch 源代码阅读心得 Posted on 2010-04-23 11:05 泰仔在线 阅读(230) 评论(0) 编辑 收藏 所属分类: 云计算相关 主要类分析: 一、 org.apache.nutch.crawl.Injector: 1,注入 url.txt 2,url 标准化 3,拦截 url,进行正则校验(regex-urlfilter.txt) 4,对符 URL 标准的 url 进行 map 对构造,在构造过程中给 CrawlDatum 初始化得分,分数可影响 url host 的搜索排序,和采集优先级! 5,reduce 只做一件事,判断 url 是不是在 crawldb 中已经存在,如果存在则直接读取原 来 CrawlDatum,如果是新 host,则把相应状态存储到里边(STATUS_DB_UNFETCHED(状 态意思为没有采集过)) 二、org.apache.nutch.crawl.Generator: 1,过滤不及格 url (使用 url 过滤插件) 2,检测 URL 是否在有效更新时间里 3,获取 URL metaData,metaData 记录了 url 上次更新时间 4,对 url 进行打分 5,将 url 载入相应任务组(以 host 为分组) 6,计算 url hash 值 7,收集 url, 直至到达 topN 指定量 三、 org.apache.nutch.crawl.Fetcher: www.73cc.com 34 1,从 segment 中读取,将它放入相应的队列中,队列以 queueId 为分类,而 queueId 是由 协议://ip 组成,在放入队列过程中, 如果不存在队列则创建(比如 javaeye 的所有地址都属于这个队列: http://221.130.184.141) --> queues.addFetchItem(url, datum); 2,检查机器人协议是否允许该 url 被爬行(robots.txt) --> protocol.getRobotRules(fit.url, fit.datum); 3,检查 url 是否在有效的更新时间里 --> if (rules.getCrawlDelay() > 0) 4,针对不同协议采用不同的协议采用不同机器人,可以是 http、ftp、file,这地方已经将 内容保存下来(Content)。 --> protocol.getProtocolOutput(fit.url, fit.datum); 5,成功取回 Content 后,在次对 HTTP 状态进行识别(如 200、404)。--> case ProtocolStatus.SUCCESS: 6,内容成功保存,进入 ProtocolStatus.SUCCESS 区域,在这区域里,系统对输出内容 进行构造。 --> output(fit.url, fit.datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS); 7,在内容构造过程中,调取内容解析器插件(parseUtil),如 mp3\html\pdf\word\zip\jsp\swf……。 --> this.parseUtil.parse(content); --> parsers[i].getParse(content); 8,我们现在研究 html 解析,所以只简略说明 HtmlParser,HtmlParser 中,会解析出 text,title, outlinks, metadata。 text:过滤所有 HTML 元素;title:网页标题;outlinks:url 下的所有链接;metadata: 这东西分别做那么几件事情 首先检测 url 头部的 meta name="robots" 看看是否允许蜘蛛爬 行, 其次通过对 meta http-equiv refresh 等属性进行识别记录,看页面是否需要转向。 四、 org.apache.nutch.parse.ParseSegment: 1,这个类逻辑就相对简单很多了哦,它对我们也是很有价值的,它只做一件事情,就是对 爬行下来的 Content(原始 HTML)进行解析,具体解析通过插件来实现。 比如我们要做的数据分析、数据统计都可以在这进行实现。 2,执行完成后,输出三个 Map 对解析内容、包含所 有链接的分析后的结果 、outlinks 五、org.apache.nutch.crawl.CrawlDb: 主要根据 crawld_fatch 输出更新 crawldb。 1,map 对 crawld_fatch、crawldb 地址进行标准化(nomalizer)和拦截操作(filte); 2,reduce 在对两 crawld_fatch 和 crawldb 进行合并更新。 六、org.apache.nutch.crawl.LinkDb: 这个类的作用是管理新转化进来的链接映射,并列出每个 url 的外部链接(incoming links)。 1,先是对每一个 url 取出它的 outLinks,作 map 操作把这个 url 作为每个 outLinks 的 incoming link, 2,在 reduce 里把根据每个 key 来把一个 url 的所有 incoming link 都加到 inlinks 里。 3,这样就把每个 url 的外部链接统计出来了,注意,系统对只对外部链接进行统计,什么叫 外部链接呢,就是只对不同 host 进行统计, 记住 javaeye.com 和 biaowen.javaeye.com 是两个不同的 host 哦。 --> boolean www.73cc.com 35 ignoreInternalLinks = true; 4,然后一步是对这些新加进来的链接进行合并。 七、 org.apache.nutch.crawl.Indexer: 这个类的任务是另一方面的工作了,它是基于 hadoop 和 lucene 的分布式索引。它就是为 前面爬虫抓取回来的数据进行索引好让用户可以搜索到这些数据。 这里的输入就比较多了,有 segments 下的 fetch_dir,parseData 和 parseText,还有 crawldb 下的 current_dir 和 linkdb 下的 current_dir。 1,在这个类里,map 将所有输入都装载到一个容器里边, 2,再到 reduce 进行分类处理, 3,实现拦截 --> this.filters.filter(doc, parse, key, fetchDatum, inlinks); 4,打分 --> this.scfilters.indexerScore(key, doc, dbDatum,fetchDatum, parse, inlinks, boost); 5,当然要把这些数据体组合成一个 lucene 的 document 让它索引了。 6,在 reduce 里组装好后收集时是,最后在输出的 OutputFormat 类里进行真 正的索引。 doc 里有如下几个 field content(正文) site (所属主地址) title (标题) host (host) segement (属于哪个 segement) digest (MD5 码,去重时候用到) tstamp (时间戳) url (当前 URL 地址) 载了一个例子: doc = {content=[biaowen - JavaEye 技术网站 首页 新闻 论坛 博客 招聘 更 多 ▼ 问答 ………………(内容省略)………… biaowen 永 NF/ICP 备 05023328 号], site=[biaowen.javaeye.com], title=[biaowen - JavaEye 技术网站], host=[biaowen.javaeye.com], segment=[20090725083125], digest=[063ba8430fa84e614ce71276e176f4ce], tstamp=[20090725003318265], url=[http://biaowen.javaeye.com/]} 八、 org.apache.nutch.crawl.DeleteDuplicates: 这个类的作用就是这它的名字所写的意思--去重。 前面索引后(当然不是一次时的情况)会有重复,所以要去重。为什么呢,在一次索引时是 不重复的,可是多次抓取后就会有重复了。 就是这个原因才要去重。当然去重的规则有两种一个是以时间为标准,一种是以内容的 md5 值为标准。 www.73cc.com 36 九、org.apache.nutch.indexer.IndexMerger: 这个类就相对简单了,目的将多个 indexes 合并为一个 index,直接调用 lucene 方法实现! NTUCH 十

下载文档,方便阅读与编辑

文档的实际排版效果,会与网站的显示效果略有不同!!

需要 20 金币 [ 分享文档获得金币 ] 16 人已下载

下载文档

相关文档