Nutch 1.0 源代码分析

228823266

贡献于2012-01-17

字数:57708 关键词: 搜索引擎 nutch

Nutch 1.0 源代码分析[1] Injector   21 MAR 2010 12:55:42 +0800 ----------------------------------------------------------------------------            在Crawl中的main函数中有一句是: // initialize crawlDb injector.inject(crawlDb, rootUrlDir); 引用[李阳]:inject操作调用的是nutch的核心包之一crawl包中的类Injector。 inject操作主要作用: 1.       将URL集合进行格式化和过滤,消除其中的非法URL,并设定URL状态(UNFETCHED),按照一定方法进行初始化分值; 2.       将URL进行合并,消除重复的URL入口; 3.       将URL及其状态、分值存入crawldb数据库,与原数据库中重复的则删除旧的,更换新的。 inject操作结果:crawldb数据库内容得到更新,包括URL及其状态。 看一下inject调用的函数: public void inject(Path crawlDb, Path urlDir) throws IOException { //产生一个文件名是随机的临时文件夹 Path tempDir = new Path(getConf().get("mapred.temp.dir", ".") + "/inject-temp-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));   // map text input file to a file // 产生 key-value对的文件 JobConf sortJob = new NutchJob(getConf()); sortJob.setJobName("inject " + urlDir); FileInputFormat.addInputPath(sortJob, urlDir); sortJob.setMapperClass(InjectMapper.class);   FileOutputFormat.setOutputPath(sortJob, tempDir); sortJob.setOutputFormat(SequenceFileOutputFormat.class); sortJob.setOutputKeyClass(Text.class); sortJob.setOutputValueClass(CrawlDatum.class); sortJob.setLong("injector.current.time", System.currentTimeMillis()); JobClient.runJob(sortJob); 这里用的是hadoop的东西,输入文件目录为:用户指定的url目录。输出目录为:产生的那个临时文件夹。这里的SequenceFileOutputFormat在中的解释为:Imagine a logfile, where each log record is a new line of text. If you want to log binary types, plain text isn’t a suitable format. Hadoop’s S equenceFile class fits the bill in this situation, providing a persistent data structure for binary key-value pairs.,这里是用map函数产生对的文件。 public void map(WritableComparable key, Text value, OutputCollector output, Reporter reporter) throws IOException { String url = value.toString(); // value is line of text try { url = urlNormalizers .normalize(url, URLNormalizers.SCOPE_INJECT); url = filters.filter(url); // filter the url } catch (Exception e) { } if (url != null) { // if it passes value.set(url); // collect it CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_INJECTED, interval); datum.setFetchTime(curTime); datum.setScore(scoreInjected); try { scfilters.injectedScore(value, datum); } catch (ScoringFilterException e) { } output.collect(value, datum); } } urlNormalizers是用于规范化url,而filters用于过滤不合法的url。Map输出的key是url而value是CrawlDatum,这里设置它的几个成员变量的值: private byte status; private long fetchTime = System.currentTimeMillis(); private int fetchInterval; private float score = 1.0f; ScoringFilters是一个计算分数的类。 Inject函数的后一部分: // merge with existing crawl db JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb); FileInputFormat.addInputPath(mergeJob, tempDir); mergeJob.setReducerClass(InjectReducer.class); JobClient.runJob(mergeJob); CrawlDb.install(mergeJob, crawlDb);   // clean up FileSystem fs = FileSystem.get(getConf()); fs.delete(tempDir, true); if (LOG.isInfoEnabled()) { LOG.info("Injector: done"); } } mergeJob把刚才的临时目录当作输入目录,输出在install函数里处理,最终删除那个临时目录。下面看一下InjectReducer类: /** Combine multiple new entries for a url. */ public static class InjectReducer implements Reducer { private CrawlDatum old = new CrawlDatum(); private CrawlDatum injected = new CrawlDatum();   public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { boolean oldSet = false; while (values.hasNext()) { CrawlDatum val = values.next(); if (val.getStatus() == CrawlDatum.STATUS_INJECTED) { injected.set(val); injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED); } else { old.set(val); oldSet = true; } } CrawlDatum res = null; if (oldSet) res = old; // don't overwrite existing value else res = injected;   output.collect(key, res); } } 这里reduce,对,因为没有必要一个url关联着多个Datum,这里判断CrawlDatum的状态,如果它是STATUS_INJECTED,也就是新被注入的,设置injected值,如果是STATUS_DB_UNFETEDED,未被抓取的,就设置old的值。这里要注意的一点是如果的确是注入的过,就将res设为old,否则才设为injected。 在CrawlDb类的install中: public static void install(JobConf job, Path crawlDb) throws IOException { Path newCrawlDb = FileOutputFormat.getOutputPath(job); FileSystem fs = new JobClient(job).getFs(); Path old = new Path(crawlDb, "old"); Path current = new Path(crawlDb, CURRENT_NAME); if (fs.exists(current)) { if (fs.exists(old)) fs.delete(old, true); fs.rename(current, old); } fs.mkdirs(crawlDb); fs.rename(newCrawlDb, current); if (fs.exists(old)) fs.delete(old, true); Path lock = new Path(crawlDb, LOCK_NAME); LockUtil.removeLockFile(fs, lock); } newCrawlDb大概是crawl/crawldb/216164146,而old是crawl/crawldb/old,current是crawl/crawldb/current,如果有current,就将它重命为old,再创建crawlDb,再将它重命为current,如果old是存在的,删除,如果是有锁的,把锁删除。     Nutch 1.0 源代码分析[2] Plugin(1)   21 MAR 2010 12:58:47 +0800 ---------------------------------------------------------------------------- 借着URLNormalizers看一下Nutch的插件机制,在Injector类中的configure类中有一句是: urlNormalizers = new URLNormalizers(job, URLNormalizers.SCOPE_INJECT); 它调用的是: public URLNormalizers(Configuration conf, String scope) { this.conf = conf; this.extensionPoint = PluginRepository .get(conf).getExtensionPoint(URLNormalizer.X_POINT_ID); ObjectCache objectCache = ObjectCache.get(conf);   normalizers = (URLNormalizer[]) objectCache .getObject(URLNormalizer.X_POINT_ID + "_" + scope); if (normalizers == null) { normalizers = getURLNormalizers(scope); } if (normalizers == EMPTY_NORMALIZERS) { normalizers = (URLNormalizer[]) objectCache .getObject(URLNormalizer.X_POINT_ID + "_" + SCOPE_DEFAULT); if (normalizers == null) { normalizers = getURLNormalizers(SCOPE_DEFAULT); } }   loopCount = conf.getInt("urlnormalizer.loop.count", 1); } 这里的getExtensionPoint是得到相应的扩展点,这里URLNormailizer.X_POINT_ID是org.apache.nutch.net.URLNormalizer,关于扩展点可以看一下IBM的技术文章《Nutch 插件系统浅析》,接下来先到缓存中去找,如果没有找到就调用getURLNormalizers(),如果normalizers==EMPTY_NORMALIZERS说明它应该在缓存里有,如果缓存里存的是null,就用默认的normalizer,而loopCount是在规范化时指定要循环多少次的一个值。getURLNormalizers代码如下: URLNormalizer[] getURLNormalizers(String scope) { List extensions = getExtensions(scope);   List normalizers = new Vector(extensions.size());   Iterator it = extensions.iterator(); while (it.hasNext()) { Extension ext = it.next(); URLNormalizer normalizer = null; try { normalizer = (URLNormalizer) objectCache .getObject(ext.getId()); if (normalizer == null) { // go ahead and instantiate it and then cache it normalizer = (URLNormalizer) ext.getExtensionInstance(); objectCache.setObject(ext.getId(), normalizer); } normalizers.add(normalizer); } catch (PluginRuntimeException e) { } } return normalizers.toArray(new URLNormalizer[normalizers.size()]); } 得到相应scope的Extensions,先不去管它是如何得到的,这里将得到的Extension实例化,保存到normalizers中。下面则是getExtensions的代码: private List getExtensions(String scope) { ObjectCache objectCache = ObjectCache.get(conf); List extensions = (List) objectCache .getObject(URLNormalizer.X_POINT_ID + "_x_" + scope);   // Just compare the reference: // if this is the empty list, we know we will find no extension. if (extensions == EMPTY_EXTENSION_LIST) { return EMPTY_EXTENSION_LIST; }   if (extensions == null) { extensions = findExtensions(scope); if (extensions != null) { objectCache.setObject(URLNormalizer.X_POINT_ID + "_x_" + scope, extensions); } else { // Put the empty extension list into cache // to remember we don't know any related extension. objectCache.setObject(URLNormalizer.X_POINT_ID + "_x_" + scope, EMPTY_EXTENSION_LIST); extensions = EMPTY_EXTENSION_LIST; } } return extensions; } 还是一样,先从缓存里找,如果没有找到,则调用findExtensions,如果是null就在缓存中保存null,而如果找到,则在缓存里放入空的Extension列表。findExtension的代码如下: private List findExtensions(String scope) { String[] orders = null; String orderlist = conf.get("urlnormalizer.order." + scope); if (orderlist == null) orderlist = conf.get("urlnormalizer.order"); if (orderlist != null && !orderlist.trim().equals("")) { orders = orderlist.split("\\s+"); } String scopelist = conf.get("urlnormalizer.scope." + scope); Set impls = null; if (scopelist != null && !scopelist.trim().equals("")) { String[] names = scopelist.split("\\s+"); impls = new HashSet(Arrays.asList(names)); } Extension[] extensions = this.extensionPoint.getExtensions(); HashMap normalizerExtensions = new HashMap(); for (int i = 0; i < extensions.length; i++) { Extension extension = extensions[i]; if (impls != null && !impls.contains(extension.getClazz())) continue; normalizerExtensions.put(extension.getClazz(), extension); } List res = new ArrayList(); if (orders == null) { res.addAll(normalizerExtensions.values()); } else { // first add those explicitly named in correct order for (int i = 0; i < orders.length; i++) { Extension e = normalizerExtensions.get(orders[i]); if (e != null) { res.add(e); normalizerExtensions.remove(orders[i]); } } // then add all others in random order res.addAll(normalizerExtensions.values()); } return res; } 关于scope可以看一下注释You can define a set of contexts (or scopes) in which normalizers may be called. Each scope can have its own list of normalizers (defined in "urlnormalizer.scope." property) and its own order (defined in "urlnormalizer.order." property). If any of these properties are missing, default settings are used for the global scope.,大意是你可以再定义一个更小的域,处理关于这个子域的特征情况,比如在Injector中调用,可以指定urlnormalizer.order.injector。默认情况下,在injector域中,在配置中使用的是默认的urlnormalizer.order,相应的配置在nutch-default中: urlnormalizer.order org.apache.nutch.net.urlnormalizer.basic.BasicURLNormalizer org.apache.nutch.net.urlnormalizer.regex.RegexURLNormalizer Order in which normalizers will run. If any of these isn't activated it will be silently skipped. If other normalizers not on the list are activated, they will run in random order after the ones specified here are run. 接下来的代码是从urlnormalizer.order中得到相应的normalizer顺序,再从扩展点中得到相应的normalizerExtension,然后根据normalizer的顺序将extensions放到res中。 在Injector中调用normalize的代码是: url = urlNormalizers .normalize(url, URLNormalizers.SCOPE_INJECT); URLNormalizers的注释写到:This class uses a "chained filter" pattern to run defined normalizers. 也就是要按配置中的顺序依次进行normalize: public String normalize(String urlString, String scope) throws MalformedURLException { // optionally loop several times, and break if no further changes String initialString = urlString; for (int k = 0; k < loopCount; k++) { for (int i = 0; i < this.normalizers.length; i++) { if (urlString == null) return null; urlString = this.normalizers[i].normalize(urlString, scope); } if (initialString.equals(urlString)) break; initialString = urlString; } return urlString; } 用normlizers中的 Normalizer对象进行规范化,到loopCount次或与上次规范化的结果已经没有差异了,那么停止。          Nutch 1.0 源代码分析[3] Plugin(2)   21 MAR 2010 12:59:55 +0800 ---------------------------------------------------------------------------- 在URLNormalizers构造函数中,有一句没有看: this.extensionPoint = PluginRepository.get(conf).getExtensionPoint( URLNormalizer.X_POINT_ID); 看一下PluginRepository.get函数: public static synchronized PluginRepository get(Configuration conf) { PluginRepository result = CACHE.get(conf); if (result == null) { result = new PluginRepository(conf); CACHE.put(conf, result); } return result; } 先试着从CACHE取,如果没有被缓存过,那么就调用PluginRepository的构造函数: public PluginRepository(Configuration conf) throws RuntimeException { fActivatedPlugins = new HashMap(); fExtensionPoints = new HashMap(); this.conf = conf; this.auto = conf.getBoolean("plugin.auto-activation", true); String[] pluginFolders = conf.getStrings("plugin.folders"); PluginManifestParser manifestParser = new PluginManifestParser(conf, this); Map allPlugins = manifestParser .parsePluginFolder(pluginFolders); Pattern excludes = Pattern.compile(conf.get("plugin.excludes", "")); Pattern includes = Pattern.compile(conf.get("plugin.includes", "")); Map filteredPlugins = filter(excludes, includes, allPlugins); fRegisteredPlugins = getDependencyCheckedPlugins(filteredPlugins, this.auto ? allPlugins : filteredPlugins); installExtensionPoints(fRegisteredPlugins); try { installExtensions(fRegisteredPlugins); } catch (PluginRuntimeException e) { LOG.fatal(e.toString()); throw new RuntimeException(e.getMessage()); } displayStatus(); } 这里拷贝一点《Nutch插件系统浅析》里的介绍: 1.       plugin.folders:插件所在的目录,缺省位置在 plugins 目录下。 plugin.folders plugins 2.       plugin.auto-activation:当被配置为过滤(即不加载),但是又被其他插件依赖的时候,是否自动启动,缺省为 true。 plugin.auto-activation true 3.       plugin.includes:要包含的插件名称列表,支持正则表达式方式定义。 plugin.includes protocol-http|urlfilter-regex|parse-(text|html|js)|index-(basic|anchor) |query-(basic|site|url)|response-(json|xml)|summary-basic|scoring-opic| urlnormalizer-(pass|regex|basic) 4.       plugin.excludes:要排除的插件名称列表,支持正则表达式方式定义。 plugin.excludes 构造函数中的filter是将所有includes加入,excludes排除,下面是一段代码: for (PluginDescriptor plugin : plugins.values()) { String id = plugin.getPluginId();   if (!includes.matcher(id).matches()) { continue; } if (excludes.matcher(id).matches()) { continue; } map.put(plugin.getPluginId(), plugin); } 如果在includes里不包含排除,如果在excludes里包含,排除。回到构造函数中,如果auto为true,则安装全部插件,如果为false则只安装过滤过的。 现在看PluginManifestParser的构造函数: public PluginManifestParser(Configuration conf, PluginRepository pluginRepository) { this.conf = conf; this.pluginRepository = pluginRepository; } 接下来看parsePluginFolder: public Map parsePluginFolder( String[] pluginFolders) { Map map = new HashMap();   for (String name : pluginFolders) { File directory = getPluginFolder(name); if (directory == null) { continue; }   for (File oneSubFolder : directory.listFiles()) { if (oneSubFolder.isDirectory()) { String manifestPath = oneSubFolder.getAbsolutePath() + File.separator + "plugin.xml"; try { PluginDescriptor p = parseManifestFile(manifestPath); map.put(p.getPluginId(), p); } } } } return map; } 默认的pluginFolders是[plugins],将plugins下面的子目录下的plugin.xml取到,然后解析这个xml。parseManifestFile的代码如下: private PluginDescriptor parseManifestFile(String pManifestPath) throws MalformedURLException, SAXException, IOException, ParserConfigurationException { Document document = parseXML(new File(pManifestPath).toURL()); String pPath = new File(pManifestPath).getParent(); return parsePlugin(document, pPath); } 这已经都非常细节了,将xml解析后得到document,得到它的父目录地址后,调用parsePlugin,这个就更细节了,还是拷贝一段解释: 1. runtime 属性描述了其需要的 Jar 包,和发布的 Jar 包 2. requires 属性描述了依赖的插件 3. extension-point 描述了本插件宣布可扩展的扩展点 4. extension 属性则描述了扩展点的实现 典型的插件定义: 插件的提供者ID   依赖的Jar包 发布的Jar包   依赖的插件   扩展的扩展点ID 实现类 实现的相关属性 感觉还有一点代码比较重要: private void installExtensions(List pRegisteredPlugins) throws PluginRuntimeException { for (PluginDescriptor descriptor : pRegisteredPlugins) { for (Extension extension : descriptor.getExtensions()) { String xpId = extension.getTargetPoint(); ExtensionPoint point = getExtensionPoint(xpId); if (point == null) { } point.addExtension(extension); } } } 这里相当于是把所有的实现对应到相同的Extension point上。 最后调用: private void installExtensionPoints(List plugins) { for (PluginDescriptor plugin : plugins) { for (ExtensionPoint point : plugin.getExtenstionPoints()) { String xpId = point.getId(); fExtensionPoints.put(xpId, point); } } } 这里把extension的id和ExtensionPoint对加入到fExtensionPoints里。 在URLNormalizers里得到所有扩展是很简单的: Extension[] extensions = this.extensionPoint.getExtensions(); 实例化一个URLNormalizer: normalizer = (URLNormalizer) ext.getExtensionInstance(); getExtensionInstance的代码: public Object getExtensionInstance() throws PluginRuntimeException { // Must synchronize here to make sure creation and initialization // of a plugin instance and it extension instance are done by // one and only one thread. synchronized (getId()) { try { PluginClassLoader loader = fDescriptor.getClassLoader(); Class extensionClazz = loader.loadClass(getClazz()); // lazy loading of Plugin in case there is no instance of the // plugin already. this.pluginRepository.getPluginInstance(getDescriptor()); Object object = extensionClazz.newInstance(); if (object instanceof Configurable) { ((Configurable) object).setConf(this.conf); } return object; } } } 而fDescriptor.getClassLoader函数里加入相关的库之类的。              Nutch 1.0 源代码分析[4] Generator(1)   24 MAR 2010 18:37:16 +0800 ---------------------------------------------------------------------------- 在Crawl中,第二个比较重要的类是Generator,调用generate的那行代码是: Path segment = generator.generate(crawlDb, segments, -1, topN, System.currentTimeMillis()); generate主要是调用另一个重载函数: JobConf job = new NutchJob(getConf()); boolean filter = job.getBoolean(CRAWL_GENERATE_FILTER, true); return generate(dbDir, segments, numLists, topN, curTime, filter, false); 这个重载的函数的前半部分为: Path tempDir = new Path(getConf().get("mapred.temp.dir", ".") + "/generate-temp-" + System.currentTimeMillis());   Path segment = new Path(segments, generateSegmentName()); Path output = new Path(segment, CrawlDatum.GENERATE_DIR_NAME);   Path lock = new Path(dbDir, CrawlDb.LOCK_NAME); FileSystem fs = FileSystem.get(getConf()); LockUtil.createLockFile(fs, lock, force); 这里如Injector一样创建一个临时文件夹,文件夹名类似:/tmp/hadoop-daowu/ mapred/temp/generate-temp-1268187730024,segment文件名类似: crawl/segments/ 20100310102408,output是输出目录,文件名类似:crawl/segments/20100310102408/ crawl_generate,再给这个目录加锁。 // map to inverted subset due for fetch, sort by score JobConf job = new NutchJob(getConf()); job.setJobName("generate: select " + segment);   if (numLists == -1) { // for politeness make numLists = job.getNumMapTasks(); // a partition per fetch task } if ("local".equals(job.get("mapred.job.tracker")) && numLists != 1) { numLists = 1; } job.setLong(CRAWL_GEN_CUR_ 24 MAR 2010 18:37:16 +0800, curTime); // record real generation time long generateTime = System.currentTimeMillis(); job.setLong(Nutch.GENERATE_ 24 MAR 2010 18:37:16 +0800_KEY, generateTime); job.setLong(CRAWL_TOP_N, topN); job.setBoolean(CRAWL_GENERATE_FILTER, filter);   FileInputFormat .addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME)); job.setInputFormat(SequenceFileInputFormat.class);   job.setMapperClass(Selector.class); job.setPartitionerClass(Selector.class); job.setReducerClass(Selector.class);   FileOutputFormat.setOutputPath(job, tempDir); job.setOutputFormat(SequenceFileOutputFormat.class); job.setOutputKeyClass(FloatWritable.class); job.setOutputKeyComparatorClass(DecreasingFloatComparator.class); job.setOutputValueClass(SelectorEntry.class); try { JobClient.runJob(job); } catch (IOException e) { LockUtil.removeLockFile(fs, lock); throw e; } 输入路径为Injector的输出路径,输入格式也是Injector的输出格式(当然了!),Mapper,Partitioner,Reducer都在Selector中实现。输出路径是临时文件夹,输出的key是FloatWritable,而value是SelectorEntry。 在Selector中实现的mapper中重要的几行是: float sort = 1.0f; sort = scfilters.generatorSortValue((Text) key, crawlDatum, sort);   // sort by decreasing score, using DecreasingFloatComparator sortValue.set(sort); // record generation time crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_ 24 MAR 2010 18:37:16 +0800_KEY, genTime); entry.datum = crawlDatum; entry.url = (Text) key; output.collect(sortValue, entry); // invert for sort by score 这里用ScoringFilters计算得分,而value是SelectorEntry对象,它有两个成员CrwalDatum对象,它就是从文件中的value值再加上一个元数据,_ngt_,和url。 在Selector中实现的是getPartition为: private Partitioner hostPartitioner = new PartitionUrlByHost(); public int getPartition(FloatWritable key, Writable value, int numReduceTasks) { return hostPartitioner.getPartition(((SelectorEntry) value).url, key, numReduceTasks); } PartitionUrlByHost中的getPartition的代码如下: public int getPartition(Text key, Writable value, int numReduceTasks) { String urlString = key.toString(); try { urlString = normalizers.normalize(urlString, URLNormalizers.SCOPE_PARTITION); } URL url = null; try { url = new URL(urlString); } int hashCode = (url == null ? urlString : url.getHost()).hashCode();   // make hosts wind up in different partitions on different runs hashCode ^= seed;   return (hashCode & Integer.MAX_VALUE) % numReduceTasks; } 这里是通过url.getHost().hashCode来进行划分的。 把reducer分开看: while (values.hasNext() && count < limit) { SelectorEntry entry = values.next(); Text url = entry.url; String urlString = url.toString(); URL u = null;   // skip bad urls, including empty and null urls try { u = new URL(url.toString()); }   String host = u.getHost(); host = host.toLowerCase(); String hostname = host;   if (byIP) { if (maxedHosts.contains(host)) { continue; } if (dnsFailureHosts.contains(host)) { continue; } try { InetAddress ia = InetAddress.getByName(host); host = ia.getHostAddress(); urlString = new URL(u.getProtocol(), host, u.getPort(), u.getFile()).toString(); } catch (UnknownHostException uhe) { // remember hostnames that could not be looked up dnsFailureHosts.add(hostname); dnsFailure++; continue; } }   Limit是generate的参数topN,就是爬前多少个,得到url的host后,判断这个host是不是已经达到了一定的抓取url数,出现过就不再处理,因为当初partitioner就用host来分的。这里会判断dns能否解析,如果不能,就将它加入到dnsFailureHosts中,下次就不用再处理它了。 try { urlString = normalizers.normalize(urlString, URLNormalizers.SCOPE_GENERATE_HOST_COUNT); host = new URL(urlString).getHost(); }   // only filter if we are counting hosts if (maxPerHost > 0) { IntWritable hostCount = hostCounts.get(host); if (hostCount == null) { hostCount = new IntWritable(); hostCounts.put(host, hostCount); }   // increment hostCount hostCount.set(hostCount.get() + 1);   // skip URL if above the limit per host. if (hostCount.get() > maxPerHost) { if (hostCount.get() == maxPerHost + 1) { // remember the raw hostname that is maxed out maxedHosts.add(hostname); } continue; } }   output.collect(key, entry); 记录下每一个host相应的url数量,如果已经超过,就加入到maxedHosts中。 Reducer将输出结果以得分降序的方式写入文件。 public static class DecreasingFloatComparator extends FloatWritable.Comparator { /** Compares two FloatWritables decreasing. */ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return super.compare(b2, s2, l2, b1, s1, l1); } } 这里就是把比较顺序反写了。    Nutch 1.0 源代码分析[5] Generator(2)   24 MAR 2010 18:39:10 +0800 ---------------------------------------------------------------------------- Generate的中间部分为: job = new NutchJob(getConf()); job.setJobName("generate: partition " + segment);   job.setInt("partition.url.by.host.seed", new Random().nextInt());   FileInputFormat.addInputPath(job, tempDir); job.setInputFormat(SequenceFileInputFormat.class);   job.setMapperClass(SelectorInverseMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(SelectorEntry.class); job.setPartitionerClass(PartitionUrlByHost.class); job.setReducerClass(PartitionReducer.class); job.setNumReduceTasks(numLists);   FileOutputFormat.setOutputPath(job, output); job.setOutputFormat(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(CrawlDatum.class); job.setOutputKeyComparatorClass(HashComparator.class); try { JobClient.runJob(job); } catch (IOException e) { LockUtil.removeLockFile(fs, lock); fs.delete(tempDir, true); throw e; } 输入目录是前面的临时目录,输出目录是上一篇看过的。 Map函数在SelectorInverserMapper中实现的: public static class SelectorInverseMapper extends MapReduceBase implements Mapper { public void map(FloatWritable key, SelectorEntry value, OutputCollector output, Reporter reporter) throws IOException { SelectorEntry entry = (SelectorEntry) value; output.collect(entry.url, entry); } } Map是以entry中的url作为key将entry分开。Partitioner使用的是PartitionUrlByHost,这个已经看过了。 Reducer是PartitionReducer实现: public static class PartitionReducer extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { // if using HashComparator, we get only one input key in case of // hash collision so use only URLs from values while (values.hasNext()) { SelectorEntry entry = values.next(); output.collect(entry.url, entry.datum); } } } 这里将一个host的所有entry合到一起。注释写到为了防止冲空,这里只用url作为key。 /** Sort fetch lists by hash of URL. */ public static class HashComparator extends WritableComparator { public int compare(WritableComparable a, WritableComparable b) { Text url1 = (Text) a; Text url2 = (Text) b; int hash1 = hash(url1.getBytes(), 0, url1.getLength()); int hash2 = hash(url2.getBytes(), 0, url2.getLength()); return (hash1 < hash2 ? -1 : (hash1 == hash2 ? 0 : 1)); }   private static int hash(byte[] bytes, int start, int length) { int hash = 1; // make later bytes more significant in hash code, so that sorting // by hashcode correlates less with by-host ordering. for (int i = length - 1; i >= 0; i--) hash = (31 * hash) + (int) bytes[start + i]; return hash; } } 这里的hash值计算的注释写到,让后面的字节在hash值中更重要,这让以hashcode排序时,就不会太与host名称相关了。好比http://163.com/special/与http://163.com/sports/,如果是从开始向结尾循环,那http://163.com/这几个就是相同的,那hash值就是相近的。这样就分不开,也就是说这就不是一个好的hash函数。 Generate的最后一部分: // update the db from tempDir Path tempDir2 = new Path(getConf().get("mapred.temp.dir", ".") + "/generate-temp-" + System.currentTimeMillis());   job = new NutchJob(getConf()); job.setJobName("generate: updatedb " + dbDir); job.setLong(Nutch.GENERATE_ 24 MAR 2010 18:39:10 +0800_KEY, generateTime); FileInputFormat.addInputPath(job, tempDir); FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME)); job.setInputFormat(SequenceFileInputFormat.class); job.setMapperClass(CrawlDbUpdater.class); job.setReducerClass(CrawlDbUpdater.class); job.setOutputFormat(MapFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(CrawlDatum.class); FileOutputFormat.setOutputPath(job, tempDir2); try { JobClient.runJob(job); CrawlDb.install(job, dbDir); } fs.delete(tempDir2, true); 这里输入目录是刚才select job的输出目录和inject的输出目录,map函数和reduce函数在CrawlDbUpdate中实现: public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException { if (key instanceof FloatWritable) { // tempDir source SelectorEntry se = (SelectorEntry) value; output.collect(se.url, se.datum); } else { output.collect((Text) key, (CrawlDatum) value); } } 这里判断是从哪个文件读来的,如果是tempDir,它的key是FloatWritable,而如果是current它的key是url,所以要分别处理。 public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { while (values.hasNext()) { CrawlDatum val = values.next(); if (val.getMetaData().containsKey( Nutch.WRITABLE_GENERATE_ 24 MAR 2010 18:39:10 +0800_KEY)) { LongWritable gt = (LongWritable) val.getMetaData().get( Nutch.WRITABLE_GENERATE_ 24 MAR 2010 18:39:10 +0800_KEY); genTime.set(gt.get()); if (genTime.get() != generateTime) { orig.set(val); genTime.set(0L); continue; } } else { orig.set(val); } } if (genTime.get() != 0L) { orig.getMetaData().put(Nutch.WRITABLE_GENERATE_ 24 MAR 2010 18:39:10 +0800_KEY, genTime); } output.collect(key, orig); } 在reduce里面判断,如果WRITABLE_GENERATE_ 24 MAR 2010 18:39:10 +0800_KEY的值表明不是这次generate的,就将genTime设为0。              Nutch 1.0 源代码分析[6] Fetcher(1)   24 MAR 2010 18:40:16 +0800 ---------------------------------------------------------------------------- 在Crawl中调用fetch的代码为: fetcher.fetch(segment, threads, org.apache.nutch.fetcher.Fetcher .isParsing(conf)); // fetch it Fetch的代码为: JobConf job = new NutchJob(getConf()); job.setJobName("fetch " + segment);   job.setInt("fetcher.threads.fetch", threads); job.set(Nutch.SEGMENT_NAME_KEY, segment.getName()); job.setBoolean("fetcher.parse", parsing);   // for politeness, don't permit parallel execution of a single task job.setSpeculativeExecution(false);   FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.GENERATE_DIR_NAME)); job.setInputFormat(InputFormat.class);   job.setMapRunnerClass(Fetcher.class);   FileOutputFormat.setOutputPath(job, segment); job.setOutputFormat(FetcherOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NutchWritable.class);   JobClient.runJob(job); Job输入目录是generator产生的crawl/segments/20100310102408/crawl_generate目录,输出目录为crawl/segments/20100310102408。 Fetch的run函数中 public void run(RecordReader input, OutputCollector output, Reporter reporter) throws IOException { int threadCount = getConf().getInt("fetcher.threads.fetch", 10);   feeder = new QueueFeeder(input, fetchQueues, threadCount * 50); feeder.start();   for (int i = 0; i < threadCount; i++) { // spawn threads new FetcherThread(getConf()).start(); }   do { // wait for threads to exit try { Thread.sleep(1000); } reportStatus();   } while (activeThreads.get() > 0); } 这里默认启动10个线程,并在所有线程结束后退出。而在run里面删掉的几行代码中: this.fetchQueues = new FetchItemQueues(getConf());   feeder = new QueueFeeder(input, fetchQueues, threadCount * 50); feeder.start(); QueueFeeder继承自Thread,它的run函数为: public void run() { boolean hasMore = true; int cnt = 0;   while (hasMore) { int feed = size - queues.getTotalSize(); if (feed <= 0) { try { Thread.sleep(1000); } continue; } else { while (feed > 0 && hasMore) { try { Text url = new Text(); CrawlDatum datum = new CrawlDatum(); hasMore = reader.next(url, datum); if (hasMore) { queues.addFetchItem(url, datum); cnt++; feed--; } } } } } } 这里的size值是threadCount * 50,如果size-queues.getTotalSize(),表示队列还是满的,不需要处理。否则从reader中读取一个新的url和datum,加入到queues里,这个reader就是读取generator产生的数据。 addFetchItem的代码如下: public void addFetchItem(Text url, CrawlDatum datum) { FetchItem it = FetchItem.create(url, datum, byIP); if (it != null) addFetchItem(it); } Create的代码如下: public static FetchItem create(Text url, CrawlDatum datum, boolean byIP) { String queueID; URL u = null; try { u = new URL(url.toString()); } String proto = u.getProtocol().toLowerCase(); String host; if (byIP) { try { InetAddress addr = InetAddress.getByName(u.getHost()); host = addr.getHostAddress(); } } else { host = u.getHost(); if (host == null) { return null; } host = host.toLowerCase(); } queueID = proto + "://" + host; return new FetchItem(url, u, datum, queueID); } 得到url的协议proto,如果使用ip来做,就得到host的IP地址,否则就是host的名称。queueID为proto://host。 public void addFetchItem(FetchItem it) { FetchItemQueue fiq = getFetchItemQueue(it.queueID); fiq.addFetchItem(it); totalSize.incrementAndGet(); } getFetchItemQueue的代码如下: public synchronized FetchItemQueue getFetchItemQueue(String id) { FetchItemQueue fiq = queues.get(id); if (fiq == null) { fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay); queues.put(id, fiq); } return fiq; } 可以看出FetchItemQueue是:每一个protocal://host一个FetchItemQueue,这个FetchItemQueue的构造函数中设置最大线程数,爬的时候的时间间隔和最小爬的时间间隔。 总结一下。 FetchItem是描述一个要被抓取的项,成员变量: String queueID; Text url; URL u; CrawlDatum datum; FetchItemQueue是处理来自由同一主机ID(或是proto/hostname,或是proto/IP对),它记录正在进行的请求和请求之间的时间。成员变量为: List queue = Collections .synchronizedList(new LinkedList()); Set inProgress = Collections .synchronizedSet(new HashSet()); AtomicLong nextFetchTime = new AtomicLong(); long crawlDelay; long minCrawlDelay; int maxThreads; FetchItemQueues是一个提供简单处理的类,它记录着所有项的,并可以更方便被函数调用,成员变量为: Map queues = new HashMap(); AtomicInteger totalSize = new AtomicInteger(0); int maxThreads; boolean byIP; long crawlDelay; long minCrawlDelay; QueueFeeder是一个填充queue的类,它会自动填补那些被FetcherThread消耗的项。成员变量: private RecordReader reader; private FetchItemQueues queues; private int size; 在FetcherThread的run函数中,fetchQueues.getFetchItem()就是从队列中得到一个项: public synchronized FetchItem getFetchItem() { Iterator> it = queues.entrySet() .iterator(); while (it.hasNext()) { FetchItemQueue fiq = it.next().getValue(); // reap empty queues if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) { it.remove(); continue; } FetchItem fit = fiq.getFetchItem(); if (fit != null) { totalSize.decrementAndGet(); return fit; } } return null; } 它从queues中得到key的集合,再从这里面取得一个FetchItemQueue,如果取完,就删除这个队列。            Nutch 1.0 源代码分析[7] Fetcher(2)   24 MAR 2010 18:41:37 +0800 ---------------------------------------------------------------------------- FetcherThread是分析的重点,它继承自Thread,把它的run函数拆开来看: 第一部分: activeThreads.incrementAndGet(); // count threads   FetchItem fit = null; try { while (true) { fit = fetchQueues.getFetchItem(); if (fit == null) { if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) { // spin-wait. spinWaiting.incrementAndGet(); try { Thread.sleep(500); } catch (Exception e) { } spinWaiting.decrementAndGet(); continue; } else { // all done, finish this thread return; } } activeThread是增加活动线程的计数,下面的while循环中,fit是一个要抓取的项,这里如果返回的是null,但这并不表明没有可抓取的项了,因为getFetchItem是一个synchronized函数。从feeder和fetchQueues判断还有没有要抓取的了,如果有,就睡一会再get,如果没有了就结束线程。 再下来第二部分,页面抓取的一部分: // fetch the page redirecting = false; redirectCount = 0; do { // 有关抓取的内容 } while (redirecting && (redirectCount < maxRedirect)); Do/while循环退出的条件是,或不是redirecting,或是大于了maxRedirect次数,redirecting就是重定向。 redirecting = false; Protocol protocol = this.protocolFactory .getProtocol(fit.url.toString()); RobotRules rules = protocol.getRobotRules(fit.url,fit.datum); if (!rules.isAllowed(fit.u)) { // unblock fetchQueues.finishFetchItem(fit, true); output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE); continue; } if (rules.getCrawlDelay() > 0) { if (rules.getCrawlDelay() > maxCrawlDelay) { // unblock fetchQueues.finishFetchItem(fit, true); output( fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE); continue; } else { FetchItemQueue fiq = fetchQueues .getFetchItemQueue(fit.queueID); fiq.crawlDelay = rules.getCrawlDelay(); } } 这里从url字符串中得到相应的Protocol接口的实现的对象,并得到RobotRules对象。用它来判断url是否合法,如果不合法,就结束抓取,crawl delay就是爬虫两次访问的时间间隔,如果网站不支持maxCrawlDelay这么快的查询,也deny。 ProtocolOutput output = protocol.getProtocolOutput( fit.url, fit.datum); ProtocolStatus status = output.getStatus(); Content content = output.getContent(); ParseStatus pstatus = null; // unblock queue fetchQueues.finishFetchItem(fit);   String urlString = fit.url.toString();   switch (status.getCode()) { case ProtocolStatus.WOULDBLOCK: break;   case ProtocolStatus.SUCCESS: break; case XXXXXXXXXXXXX: break; } ProtocolOutput有两个成员变量:Content和ProtocolStatus。 Content中比较重要的变量有: private String url; private String base; private byte[] content; private String contentType; private Properties metadata; url是原始的url字符串,base是转换为URL后再toString()得到的字符串,contentType 说内网页类型,metadata元数据:meta标签是内嵌在你网页中的特殊html标签,包含着你有关于你网页的一些隐藏信息。Meat标签的作用是向搜索引擎解释你的网页是有关哪方面信息的。 比如对于http://quweiprotoss.blog.163.com/这个网址: url为:http://quweiprotoss.blog.163.com/ base为:http://quweiprotoss.blog.163.com/ contentType为:text/html;charset=gbk metadata为:{Cache-Control=no-cache, Server=nginx, Content-Encoding=gzip, Date=Tue, 23 Feb 2010 08:58:13 GMT, Pragma=no-cache, Vary=Host,User-Agent,Accept-Encoding, Set-Cookie=JSESSIONID=6443124A26B8A59B6E5B5C65DDEF45A7.app-58-8010; Path=/, Expires=Thu, 01 Jan 1970 00:00:00 GMT, Content-Type=text/html;charset=gbk, Connection=close} content为: Koala++'s blog - quweiprotoss - 网易博客 转过去看output的内容,先看前一部分: datum.setStatus(status); datum.setFetchTime(System.currentTimeMillis()); datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);   ParseResult parseResult = null; if (content != null) { try { scfilters.passScoreBeforeParsing(key, datum, content); } /* * Note: Fetcher will only follow meta-redirects coming from the * original URL. */ if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) { try { parseResult = this.parseUtil.parse(content); } } } 在datum中设置状态,协议状态,这里scfilters是在页面分析前对一个url的权重得分计算,如果要解析就用parseUtil去解析,得到parseResult。 output.collect(key, new NutchWritable(datum)); if (content != null && storingContent) output.collect(key, new NutchWritable(content)); key是url,value是爬取的有关信息,如果保存页面,就保存。 for (Entry entry : parseResult) { Text url = entry.getKey(); Parse parse = entry.getValue(); ParseStatus parseStatus = parse.getData().getStatus();   // Calculate page signature. For non-parsing fetchers // this will be done in ParseSegment byte[] signature = SignatureFactory.getSignature( getConf()).calculate(content, parse); // Ensure segment name and score are in parseData // metadata parse.getData().getContentMeta().set( Nutch.SEGMENT_NAME_KEY, segmentName); parse.getData().getContentMeta().set( Nutch.SIGNATURE_KEY, StringUtil.toHexString(signature)); // Pass fetch time to content meta parse.getData().getContentMeta().set( Nutch.FETCH_ 24 MAR 2010 18:41:37 +0800_KEY, Long.toString(datum.getFetchTime())); if (url.equals(key)) datum.setSignature(signature); try { scfilters.passScoreAfterParsing(url, content, parse); } output.collect(url, new NutchWritable(new ParseImpl( new ParseText(parse.getText()), parse.getData(), parse.isCanonical()))); } 这里循环处理解析后的数据,比如url为:http://vip.163.com/hd/shangdao/index.htm,则parse.getData为: Version: 5 Status: success(1,0) Title: 得商道,握天下! - 网易VIP尊贵邮 Outlinks: 30 outlink: toUrl: http://vip.163.com/hd/shangdao/;this.style.color= anchor: outlink: toUrl: http://vip.163.com/hd/shangdao/css/style.css anchor: outlink: toUrl: http://vip.163.com/hd/shangdao/js/base.js anchor: outlink: toUrl: http://vip.163.com/hd/shangdao/index.htm anchor: outlink: toUrl: http://vip.163.com/hd/shangdao/img/tbanner.jpg anchor: outlink: toUrl: http://vip.163.com/hd/shangdao/p1.htm anchor: 经济 商贸地图 浓缩展现全球经贸生态 全球股市 黄金原油行情实时查询 汇率换算 轻松贸易 而parse.getText为: 得商道,握天下! - 网易VIP尊贵邮 经济 商贸地图 浓缩展现全球经贸生态 全球股市 黄金原油行情实时查询 汇率换算 轻松贸易 商务 商贸应用文 写作更轻松 国际礼仪无所不知 各国 Signature默认用的是MD5Signature,它是页面签名的默认实现,它对页面原始的二进制内容求MD5值,如果没有内容,它计算页面URL的MD5值。它就是用于区分网页的,这里就暂时跳过,重点在passScoreAfterParsing中 在Fetcher中有一行代码是: job.setOutputFormat(FetcherOutputFormat.class); 写成入相应的文件。                  Nutch 1.0 源代码分析[8] ParseSegment   24 MAR 2010 18:42:47 +0800 ---------------------------------------------------------------------------- 在Crawl中,要关于解析的代码为: if (!Fetcher.isParsing(job)) { parseSegment.parse(segment); // parse it, if needed } 如果需要解析,而解析。而parse函数的代码为: public void parse(Path segment) throws IOException { JobConf job = new NutchJob(getConf()); job.setJobName("parse " + segment);   FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME)); job.set(Nutch.SEGMENT_NAME_KEY, segment.getName()); job.setInputFormat(SequenceFileInputFormat.class); job.setMapperClass(ParseSegment.class); job.setReducerClass(ParseSegment.class);   FileOutputFormat.setOutputPath(job, segment); job.setOutputFormat(ParseOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(ParseImpl.class);   JobClient.runJob(job); } 输入目录为Fetcher保存的content目录,map和reduce函数在ParseSegment中实现。 public void map(WritableComparable key, Content content, OutputCollector output, Reporter reporter) throws IOException { ParseResult parseResult = null; try { parseResult = new ParseUtil(getConf()).parse(content); }   for (Entry entry : parseResult) { Text url = entry.getKey(); Parse parse = entry.getValue(); ParseStatus parseStatus = parse.getData().getStatus();   // pass segment name to parse data parse.getData().getContentMeta().set(Nutch.SEGMENT_NAME_KEY, getConf().get(Nutch.SEGMENT_NAME_KEY));   // compute the new signature byte[] signature = SignatureFactory.getSignature(getConf()) .calculate(content, parse); parse.getData().getContentMeta().set(Nutch.SIGNATURE_KEY, StringUtil.toHexString(signature));   try { scfilters.passScoreAfterParsing(url, content, parse); } output.collect(url, new ParseImpl( new ParseText(parse.getText()), parse.getData(), parse.isCanonical())); } } 这里的过程与在Fetcher中没有太多区别,计算singature,再计算解析后的得分。之所以有两次是因为:“fetcher.parse”这个属性用来判断是在fetcher网页过程中parser,还是在fetch完所有网页之后再parser。[若冰]。 Reduce函数如下: public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { output.collect(key, (Writable) values.next()); // collect first value } 它只取第一个值。 解析后的输出格式为ParseOutputFormat,在它的getRecordWriter函数中: Path out = FileOutputFormat.getOutputPath(job);   Path text = new Path(new Path(out, ParseText.DIR_NAME), name); Path data = new Path(new Path(out, ParseData.DIR_NAME), name); Path crawl = new Path(new Path(out, CrawlDatum.PARSE_DIR_NAME), name);   final MapFile.Writer textOut = new MapFile.Writer(job, fs, text .toString(), Text.class, ParseText.class, CompressionType.RECORD, progress);   final MapFile.Writer dataOut = new MapFile.Writer(job, fs, data .toString(), Text.class, ParseData.class, compType, progress);   final SequenceFile.Writer crawlOut = SequenceFile.createWriter(fs, job, crawl, Text.class, CrawlDatum.class, compType, progress); 可以看出它分为三个文件夹,parse_text,parse_data,crawl_parse。 写入解析后的文本是很简单的: textOut.append(key, new ParseText(parse.getText())); 把剩下的代码拆开来看: ParseData parseData = parse.getData(); // recover the signature prepared by Fetcher or ParseSegment String sig = parseData.getContentMeta() .get(Nutch.SIGNATURE_KEY); if (sig != null) { byte[] signature = StringUtil.fromHexString(sig); if (signature != null) { // append a CrawlDatum with a signature CrawlDatum d = new CrawlDatum( CrawlDatum.STATUS_SIGNATURE, 0); d.setSignature(signature); crawlOut.append(key, d); } } 得到解析后的signature,将CrawlDatum的signature设为此值,写入到crawl_parse中去。 ParseStatus pstatus = parseData.getStatus(); if (pstatus != null && pstatus.isSuccess() && pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) { String newUrl = pstatus.getMessage(); int refreshTime = Integer.valueOf(pstatus.getArgs()[1]); newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER); newUrl = filters.filter(newUrl); String url = key.toString(); if (newUrl != null && !newUrl.equals(url)) { String reprUrl = URLUtil.chooseRepr(url, newUrl, refreshTime < Fetcher.PERM_REFRESH_ 24 MAR 2010 18:42:47 +0800); CrawlDatum newDatum = new CrawlDatum(); newDatum.setStatus(CrawlDatum.STATUS_http://quweiprotoss.blog.163.com/blog/static/40882883201022464247852ED); if (reprUrl != null && !reprUrl.equals(newUrl)) { newDatum.getMetaData().put( Nutch.WRITABLE_REPR_URL_KEY, new Text(reprUrl)); } crawlOut.append(new Text(newUrl), newDatum); } } 这一段是对于解析成功但状态是SUCCESS_REDIRECT,重定向这种情况,它得到重定向后的url,对这个url进行规范化,如果这个url与原始的url不同,则将它记录下来。 // collect outlinks for subsequent db update Outlink[] links = parseData.getOutlinks(); int outlinksToStore = Math.min(maxOutlinks, links.length); if (ignoreExternalLinks) { try { fromHost = new URL(fromUrl).getHost().toLowerCase(); } catch (MalformedURLException e) { fromHost = null; } } else { fromHost = null; } 得到所有的外链,并得到这个页面的主机名。 int validCount = 0; CrawlDatum adjust = null; List> targets = new ArrayList>(outlinksToStore); List outlinkList = new ArrayList( outlinksToStore); for (int i = 0; i < links.length && validCount < outlinksToStore; i++) { String toUrl = links[i].getToUrl(); // ignore links to self (or anchors within the page) if (fromUrl.equals(toUrl)) { continue; } if (ignoreExternalLinks) { try { toHost = new URL(toUrl).getHost().toLowerCase(); } if (toHost == null || !toHost.equals(fromHost)) { continue; // skip it } } try { toUrl = normalizers.normalize(toUrl, URLNormalizers.SCOPE_OUThttp://quweiprotoss.blog.163.com/blog/static/40882883201022464247852); // normalize the url toUrl = filters.filter(toUrl); // filter the url } CrawlDatum target = new CrawlDatum( CrawlDatum.STATUS_http://quweiprotoss.blog.163.com/blog/static/40882883201022464247852ED, interval); Text targetUrl = new Text(toUrl); try { scfilters.initialScore(targetUrl, target); }   targets.add(new SimpleEntry(targetUrl, target)); outlinkList.add(links[i]); validCount++; } 将链接得到后,判断如果是自身,就不要了,如果选择忽略别的网站的外链,也忽略。再对URL进行规范化,过滤,最后计算它的得分。 try { // compute score contributions and adjustment to the original score adjust = scfilters.distributeScoreToOutlinks((Text) key, parseData, targets, null, links.length); } for (Entry target : targets) { crawlOut.append(target.getKey(), target.getValue()); } if (adjust != null) crawlOut.append(key, adjust); adjust是计算得分的分布后,对原先计算的分数进行调整,这里把所有得到的合法外链写入,再将调整后的CrawlDatum写入。 Outlink[] filteredLinks = outlinkList .toArray(new Outlink[outlinkList.size()]); parseData = new ParseData(parseData.getStatus(), parseData .getTitle(), filteredLinks, parseData.getContentMeta(), parseData.getParseMeta()); dataOut.append(key, parseData); if (!parse.isCanonical()) { CrawlDatum datum = new CrawlDatum(); datum.setStatus(CrawlDatum.STATUS_FETCH_SUCCESS); String timeString = parse.getData().getContentMeta().get( Nutch.FETCH_ 24 MAR 2010 18:42:47 +0800_KEY); try { datum.setFetchTime(Long.parseLong(timeString)); } crawlOut.append(key, datum); } 前面是将Parsed data写入,再将非canonical的URL设置状态和抓取时间写入。            Nutch 1.0 源代码分析[8] CrawlDb   24 MAR 2010 18:44:08 +0800 ---------------------------------------------------------------------------- 再接下来Crawl类中的重要的一行就是: crawlDbTool.update(crawlDb, new Path[] { segment }, true, true); 下面就是updater的代码: boolean additionsAllowed = getConf().getBoolean( CRAWLDB_ADDITIONS_ALLOWED, true); update(crawlDb, segments, normalize, filter, additionsAllowed, false); 只是得到了是不是可以再增加。 Update的代码还是用map/reduce: public void update(Path crawlDb, Path[] segments, boolean normalize, boolean filter, boolean additionsAllowed, boolean force) throws IOException { JobConf job = CrawlDb.createJob(getConf(), crawlDb); job.setBoolean(CRAWLDB_ADDITIONS_ALLOWED, additionsAllowed); job.setBoolean(CrawlDbFilter.URL_FILTERING, filter); job.setBoolean(CrawlDbFilter.URL_NORMALIZING, normalize); for (int i = 0; i < segments.length; i++) { Path fetch = new Path(segments[i], CrawlDatum.FETCH_DIR_NAME); Path parse = new Path(segments[i], CrawlDatum.PARSE_DIR_NAME); if (fs.exists(fetch) && fs.exists(parse)) { FileInputFormat.addInputPath(job, fetch); FileInputFormat.addInputPath(job, parse); } }   try { JobClient.runJob(job); }   CrawlDb.install(job, crawlDb); } 将是否增加,是否对URL进行过滤,是否进行规范化进行设置。这里将crawl_fetch和crawl_parse目录加入,install函数只是换一下目录而已,换成先前的current目录,也就是要爬的URL数据库。 public static JobConf createJob(Configuration config, Path crawlDb) throws IOException { Path newCrawlDb = new Path(crawlDb, Integer.toString(new Random() .nextInt(Integer.MAX_VALUE)));   JobConf job = new NutchJob(config); job.setJobName("crawldb " + crawlDb);   Path current = new Path(crawlDb, CURRENT_NAME); if (FileSystem.get(job).exists(current)) { FileInputFormat.addInputPath(job, current); } job.setInputFormat(SequenceFileInputFormat.class);   job.setMapperClass(CrawlDbFilter.class); job.setReducerClass(CrawlDbReducer.class);   FileOutputFormat.setOutputPath(job, newCrawlDb); job.setOutputFormat(MapFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(CrawlDatum.class);   return job; } 将先前的要爬的数据库current也作为输入目录,Mapper类为CrawlDbFilter,Reducer类为CrawlDbReducer,输出目录为一个临时目录。 下面是CrawlDbFilter的map函数: public void map(Text key, CrawlDatum value, OutputCollector output, Reporter reporter) throws IOException { String url = key.toString(); if (urlNormalizers) { try { url = normalizers.normalize(url, scope); // normalize the url } } if (url != null && urlFiltering) { try { url = filters.filter(url); // filter the url } } if (url != null) { // if it passes newKey.set(url); // collect it output.collect(newKey, value); } } 这里对url进行规范化,过滤。 CrawlDbReducer中的reduce代码比较长,似乎规则比较多,大意就是将这些URL合并,将CrawlDatum的值设置为合适的。                Nutch 1.0 源代码分析[9] LinkDb   24 MAR 2010 18:45:02 +0800 ---------------------------------------------------------------------------- 再接下来Crawl中一个重要的类是LinkDb: linkDbTool.invert(linkDb, segments, true, true, false); 把linkDbTool拆成两部分: Path currentLinkDb = new Path(linkDb, CURRENT_NAME); JobConf job = LinkDb.createJob(getConf(), linkDb, normalize, filter); for (int i = 0; i < segments.length; i++) { FileInputFormat.addInputPath(job, new Path(segments[i], ParseData.DIR_NAME)); } try { JobClient.runJob(job); } 将每个segment中的parse_data作为输入加入。 private static JobConf createJob(Configuration config, Path linkDb, boolean normalize, boolean filter) { JobConf job = new NutchJob(config);   job.setInputFormat(SequenceFileInputFormat.class);   job.setMapperClass(LinkDb.class); job.setCombinerClass(LinkDbMerger.class); job.setReducerClass(LinkDbMerger.class);   FileOutputFormat.setOutputPath(job, newLinkDb); job.setOutputFormat(MapFileOutputFormat.class); job.setBoolean("mapred.output.compress", true); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Inlinks.class);   return job; } 这里Mapper类为LinkDb,而Reducer类为LinkDbMerger。 LinkDb类中的map函数为: public void map(Text key, ParseData parseData, OutputCollector output, Reporter reporter) throws IOException { String fromUrl = key.toString(); String fromHost = getHost(fromUrl); if (urlNormalizers != null) { try { fromUrl = urlNormalizers.normalize(fromUrl, URLNormalizers.SCOPE_http://quweiprotoss.blog.163.com/blog/static/4088288320102246452550DB); // normalize the url } } if (fromUrl != null && urlFilters != null) { try { fromUrl = urlFilters.filter(fromUrl); // filter the url } } if (fromUrl == null) return; // discard all outlinks 得到原URL的主机名,对原URL进行规范化,过滤。 Outlink[] outlinks = parseData.getOutlinks(); Inlinks inlinks = new Inlinks(); for (int i = 0; i < outlinks.length; i++) { Outlink outlink = outlinks[i]; String toUrl = outlink.getToUrl();   if (ignoreInternalLinks) { String toHost = getHost(toUrl); if (toHost == null || toHost.equals(fromHost)) { continue; // skip it } } if (urlNormalizers != null) { try { toUrl = urlNormalizers.normalize(toUrl, URLNormalizers.SCOPE_http://quweiprotoss.blog.163.com/blog/static/4088288320102246452550DB); // normalize the url } } if (toUrl != null && urlFilters != null) { try { toUrl = urlFilters.filter(toUrl); // filter the url } } if (toUrl == null) continue; inlinks.clear(); String anchor = outlink.getAnchor(); // truncate long anchors if (anchor.length() > maxAnchorLength) { anchor = anchor.substring(0, maxAnchorLength); } inlinks.add(new Inlink(fromUrl, anchor));// collect inverted link output.collect(new Text(toUrl), inlinks); } } 得到所有的外链,如果这个外链和原URL有同一主机名,并且在忽略内部链接时,就不管了,接下来,对这个URL也进行规范化,过滤。再得到外链相应的锚点,将这个放入inlinks中,最后output得到的是key是外链URL,value是原URL和锚点合起来的一个对象。 Combiner和Reducer类是LinkDbMerger,它的reduce函数如下: public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { Inlinks result = new Inlinks();   while (values.hasNext()) { Inlinks inlinks = values.next();   int end = Math.min(maxInlinks - result.size(), inlinks.size()); Iterator it = inlinks.iterator(); int i = 0; while (it.hasNext() && i++ < end) { result.add(it.next()); } } if (result.size() == 0) return; output.collect(key, result);   } 每次将一个URL的外部链接合到一起,最多为maxInlinks。 再接下来是invert函数的下一部分: if (fs.exists(currentLinkDb)) { // try to merge Path newLinkDb = FileOutputFormat.getOutputPath(job); job = LinkDbMerger.createMergeJob(getConf(), linkDb, normalize, filter); FileInputFormat.addInputPath(job, currentLinkDb); FileInputFormat.addInputPath(job, newLinkDb); try { JobClient.runJob(job); } fs.delete(newLinkDb, true); } 如果已经有一个linkdb了,那么试着合并,即将刚才产生的linkdb和以前的linkdb合并,createMergeJob的代码如下: public static JobConf createMergeJob(Configuration config, Path linkDb, boolean normalize, boolean filter) { JobConf job = new NutchJob(config);   job.setInputFormat(SequenceFileInputFormat.class);   job.setMapperClass(LinkDbFilter.class); job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize); job.setBoolean(LinkDbFilter.URL_FILTERING, filter); job.setReducerClass(LinkDbMerger.class);   FileOutputFormat.setOutputPath(job, newLinkDb); job.setOutputFormat(MapFileOutputFormat.class); job.setBoolean("mapred.output.compress", true); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Inlinks.class);   return job; } 这里Reducer类仍然是LinkDbMerger,而Mapper类是LinkDbFilter,它的map函数为: public void map(Text key, Inlinks value, OutputCollector output, Reporter reporter) throws IOException { String url = key.toString(); Inlinks result = new Inlinks();   Iterator it = value.iterator(); String fromUrl = null; while (it.hasNext()) { Inlink inlink = it.next(); fromUrl = inlink.getFromUrl(); if (fromUrl != null) { result.add(new Inlink(fromUrl, inlink.getAnchor())); } } if (result.size() > 0) { // don't collect empty inlinks newKey.set(url); output.collect(newKey, result); } } 这里又相当于是一个LinkDbMerger的反过程,把一个个又拆开,目的是进行下一步的reduce合并,这里add是没有关系的,因为Inlinks的成员变量inlinks是HashSet。        Nutch 1.0 源代码分析[10] Indexer   24 MAR 2010 18:46:08 +0800 ---------------------------------------------------------------------------- Crawl接下来就是建索引了: indexer.index(indexes, crawlDb, linkDb, Arrays.asList(HadoopFSUtil .getPaths(fstats))); Index代码如下: public void index(Path luceneDir, Path crawlDb, Path linkDb, List segments) throws IOException { final JobConf job = new NutchJob(getConf());   IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job);   FileOutputFormat.setOutputPath(job, luceneDir);   LuceneWriter.addFieldOptions("segment", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job); LuceneWriter.addFieldOptions("digest", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job); LuceneWriter.addFieldOptions("boost", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job);   NutchIndexWriterFactory.addClassToConf(job, LuceneWriter.class);   JobClient.runJob(job); } 这里已经可以看到一些 Lucene的影子了,Field的名字,是否保存,是否索引。 initMRJob的代码如下: public static void initMRJob(Path crawlDb, Path linkDb, Collection segments, JobConf job) { for (final Path segment : segments) { LOG.info("IndexerMapReduces: adding segment: " + segment); FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.FETCH_DIR_NAME)); FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.PARSE_DIR_NAME)); FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME)); FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME)); }   FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME)); FileInputFormat .addInputPath(job, new Path(linkDb, LinkDb.CURRENT_NAME)); job.setInputFormat(SequenceFileInputFormat.class);   job.setMapperClass(IndexerMapReduce.class); job.setReducerClass(IndexerMapReduce.class);   job.setOutputFormat(IndexerOutputFormat.class); job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(NutchWritable.class); job.setOutputValueClass(NutchWritable.class); } 将每个segment下的,crawl_fetch,crawl_parse,parse_data,parse_text加入输入目录,将current和linkdb也加入输入目录,map和reduce函数都在这个类中实现。 Map将同一URL的数据合到一起: public void map(Text key, Writable value, OutputCollector output, Reporter reporter) throws IOException { output.collect(key, new NutchWritable(value)); } 将reduce拆开来看: Inlinks inlinks = null; CrawlDatum dbDatum = null; CrawlDatum fetchDatum = null; ParseData parseData = null; ParseText parseText = null; while (values.hasNext()) { final Writable value = values.next().get(); // unwrap if (value instanceof Inlinks) { inlinks = (Inlinks) value; } else if (value instanceof CrawlDatum) { final CrawlDatum datum = (CrawlDatum) value; if (CrawlDatum.hasDbStatus(datum)) dbDatum = datum; else if (CrawlDatum.hasFetchStatus(datum)) { // don't index unmodified (empty) pages if (datum.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) fetchDatum = datum; } else if (CrawlDatum.STATUS_http://quweiprotoss.blog.163.com/blog/static/4088288320102246468505ED == datum.getStatus() || CrawlDatum.STATUS_SIGNATURE == datum.getStatus()) { continue; } } else if (value instanceof ParseData) { parseData = (ParseData) value; } else if (value instanceof ParseText) { parseText = (ParseText) value; } } 判断value是什么类型的,如果是CrawlDatum有可能是dbDatum也可能是fetchDatum。 NutchDocument doc = new NutchDocument(); final Metadata metadata = parseData.getContentMeta();   // add segment, used to map from merged index back to segment files doc.add("segment", metadata.get(Nutch.SEGMENT_NAME_KEY));   // add digest, used by dedup doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY));   final Parse parse = new ParseImpl(parseText, parseData); try { // extract information from dbDatum and pass it to // fetchDatum so that indexing filters can use it final Text url = (Text) dbDatum.getMetaData().get( Nutch.WRITABLE_REPR_URL_KEY); if (url != null) { fetchDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY, url); } // run indexing filters doc = this.filters.filter(doc, parse, key, fetchDatum, inlinks); }   float boost = 1.0f; // run scoring filters try { boost = this.scfilters.indexerScore(key, doc, dbDatum, fetchDatum, parse, inlinks, boost); } // apply boost to all indexed fields. doc.setScore(boost); // store boost for use by explain and dedup doc.add("boost", Float.toString(boost));   output.collect(key, doc); Field的segment名字为保存在元数据中的SEGMENT_NAME_KEY,Field的digest为元数据中的SIGNATURE_KEY,它用于去重。将URL得到,再对这个文档进行过滤,再计算得分,返回key为URL,value为NutchDocument。                                         Hadoop 其实并非一个单纯用于存储的分布式文件系统,而是一个被设计用来在由普通硬件设备组成的大型集群上执行分布式应用的框架。 Hadoop 包含两个部分:一个分布式文件系统 HDFS (Hadoop Distributed File System),和一个Map-Reduce实现。       研究hadoop,从nutch入手是比较好的选择,分布式文件系统就不说了,下面说说MapReduce产生Job中设置的输入输出,一般new一个Job会这样设置 输入输出路径: Java代码 1. FileInputFormat.addInputPath(job, in);    2.    3. FileOutputFormat.setOutputPath(job, out);   FileInputFormat.addInputPath(job, in); FileOutputFormat.setOutputPath(job, out);   从方法名称上,你可能会发现add、set的前缀,没错,输入可以添加多个路径,输出只能设置一个路径。 设置输入、输出格式: Java代码 1. job.setInputFormat(SequenceFileInputFormat.class);    2.    3. job.setOutputFormat(MapFileOutputFormat.class);   job.setInputFormat(SequenceFileInputFormat.class); job.setOutputFormat(MapFileOutputFormat.class);   输出格式 看过nutch的同志,会发现nutch的一个精彩实现,就是实现OutputFormat接口的FetcherOutputFormat类,我们来看看怎么个回事。   接口 :org.apache.hadoop.mapred.OutputFormat Java代码 1. public interface OutputFormat {    2.    3. RecordWriter getRecordWriter(FileSystem ignored, JobConf job,String name, Progressable progress)    4. throws IOException;    5.    6. void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;    7.    8. }   public interface OutputFormat { RecordWriter getRecordWriter(FileSystem ignored, JobConf job,String name, Progressable progress) throws IOException; void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException; }  checkOutputSpecs :检查job的输出路径是否存在,如果存在则抛出异常(IOException)。我这里的版本是0.19.2,还没有override的功能,可能后面会支持。  getRecordWriter     :把输出键值对 output 写入到输出路径中。   mapred下面的实现有三个,如下图:   基类FileOutputFormat :org.apache.hadoop.mapred.FileOutputFormat Java代码 1. public abstract class FileOutputFormat implements OutputFormat {    2.    3. public abstract RecordWriter getRecordWriter(FileSystem ignored,JobConf job, String name,Progressable progress)     4. throws IOException;    5.    6. public void checkOutputSpecs(FileSystem ignored, JobConf job) throws FileAlreadyExistsException,     7. InvalidJobConfException, IOException {    8.    9.     // Ensure that the output directory is set and not already there    10.     Path outDir = getOutputPath(job);    11.     if (outDir == null && job.getNumReduceTasks() != 0) {    12.       throw new InvalidJobConfException("Output directory not set in JobConf.");    13.     }    14.     if (outDir != null) {    15.       FileSystem fs = outDir.getFileSystem(job);    16.       // normalize the output directory    17.       outDir = fs.makeQualified(outDir);    18.       setOutputPath(job, outDir);    19.       // check its existence    20.       if (fs.exists(outDir)) {    21.         throw new FileAlreadyExistsException("Output directory " + outDir +  " already exists");    22.       }    23.     }    24.   }   public abstract class FileOutputFormat implements OutputFormat { public abstract RecordWriter getRecordWriter(FileSystem ignored,JobConf job, String name,Progressable progress) throws IOException; public void checkOutputSpecs(FileSystem ignored, JobConf job) throws FileAlreadyExistsException, InvalidJobConfException, IOException { // Ensure that the output directory is set and not already there Path outDir = getOutputPath(job); if (outDir == null && job.getNumReduceTasks() != 0) { throw new InvalidJobConfException("Output directory not set in JobConf."); } if (outDir != null) { FileSystem fs = outDir.getFileSystem(job); // normalize the output directory outDir = fs.makeQualified(outDir); setOutputPath(job, outDir); // check its existence if (fs.exists(outDir)) { throw new FileAlreadyExistsException("Output directory " + outDir + " already exists"); } } }  这是个抽象类,实现了检查输入路径是否存在的方法,具体输出方式写成抽象方法预留给了子类。  子类见下图:   子类MapFileOutputFormat :org.apache.hadoop.mapred.MapFileOutputFormat Java代码 1. public class MapFileOutputFormat     2. extends FileOutputFormat {    3.    4.   public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,    5.                                       String name, Progressable progress)    6.     throws IOException {    7.     // get the path of the temporary output file     8.     Path file = FileOutputFormat.getTaskOutputPath(job, name);    9.         10.     FileSystem fs = file.getFileSystem(job);    11.     CompressionCodec codec = null;    12.     CompressionType compressionType = CompressionType.NONE;    13.     if (getCompressOutput(job)) {    14.       // find the kind of compression to do    15.       compressionType = SequenceFileOutputFormat.getOutputCompressionType(job);    16.    17.       // find the right codec    18.       Class codecClass = getOutputCompressorClass(job,    19.       DefaultCodec.class);    20.       codec = ReflectionUtils.newInstance(codecClass, job);    21.     }    22.         23.     // ignore the progress parameter, since MapFile is local    24.     final MapFile.Writer out =    25.       new MapFile.Writer(job, fs, file.toString(),    26.                          job.getOutputKeyClass().asSubclass(WritableComparable.class),    27.                          job.getOutputValueClass().asSubclass(Writable.class),    28.                          compressionType, codec,    29.                          progress);    30.    31.     return new RecordWriter() {    32.    33.         public void write(WritableComparable key, Writable value)    34.           throws IOException {    35.    36.           out.append(key, value);    37.         }    38.    39.         public void close(Reporter reporter) throws IOException { out.close();}    40.       };    41.   }    42. }   public class MapFileOutputFormat extends FileOutputFormat { public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { // get the path of the temporary output file Path file = FileOutputFormat.getTaskOutputPath(job, name); FileSystem fs = file.getFileSystem(job); CompressionCodec codec = null; CompressionType compressionType = CompressionType.NONE; if (getCompressOutput(job)) { // find the kind of compression to do compressionType = SequenceFileOutputFormat.getOutputCompressionType(job); // find the right codec Class codecClass = getOutputCompressorClass(job, DefaultCodec.class); codec = ReflectionUtils.newInstance(codecClass, job); } // ignore the progress parameter, since MapFile is local final MapFile.Writer out = new MapFile.Writer(job, fs, file.toString(), job.getOutputKeyClass().asSubclass(WritableComparable.class), job.getOutputValueClass().asSubclass(Writable.class), compressionType, codec, progress); return new RecordWriter() { public void write(WritableComparable key, Writable value) throws IOException { out.append(key, value); } public void close(Reporter reporter) throws IOException { out.close();} }; } }    关键点在于获取分布式文件输出句柄MapFile.Writer,完成输出任务后会关闭输出。每个实现都有特定用途,都需要弄清楚,在这里就不再一一介绍了。       上面是hadoop自己的实现,在具体的编程过程中,我们肯定会有自己的实现去定义输出格式。上面也讲到了job只能设置输出路径,不能添加多个输出路径,那么有什么解决措施呢?来看看nutch中的精彩实现,会给我们启示:   自己的实现: org.apache.nutch.parse.ParseOutputFormat Java代码 1. public class ParseOutputFormat implements OutputFormat {    2.    3. //这里不是检查输出路径,是检查数据路径下的子路径,改变了接口中的定义    4. public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {    5.     Path out = FileOutputFormat.getOutputPath(job);    6.     if (fs.exists(new Path(out, CrawlDatum.PARSE_DIR_NAME)))    7.       throw new IOException("Segment already parsed!");    8.   }    9.    10. //下面获取了三个输入句柄,分别向三个路径中输出键值对    11. public RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progress)    12.  throws IOException {    13.    14.    ......    15.     Path text = new Path(new Path(out, ParseText.DIR_NAME), name); // 一个输出路径    16.     Path data = new Path(new Path(out, ParseData.DIR_NAME), name); //两个输出路径    17.  Path crawl = new Path(new Path(out, CrawlDatum.PARSE_DIR_NAME), name);//三个输出路径    18.          19. //一个写入    20.     final MapFile.Writer textOut =    21.       new MapFile.Writer(job, fs, text.toString(), Text.class, ParseText.class,    22.           CompressionType.RECORD, progress);    23.         24. //第二个写入    25.     final MapFile.Writer dataOut =    26.       new MapFile.Writer(job, fs, data.toString(), Text.class, ParseData.class,    27.           compType, progress);    28.         29. //第三个写入    30.     final SequenceFile.Writer crawlOut =    31.       SequenceFile.createWriter(fs, job, crawl, Text.class, CrawlDatum.class,    32.           compType, progress);    33.         34.     return new RecordWriter() {    35.    36.         public void write(Text key, Parse parse)throws IOException {    37.    38. ......    39.               crawlOut.append(key, d);    40. .......    41.              crawlOut.append(new Text(newUrl), newDatum);    42. ......    43.              crawlOut.append(key, adjust);    44. ......    45.               dataOut.append(key, parseData);    46. ......    47.               crawlOut.append(key, datum);    48.    49.           }    50.         }    51. //关闭三个句柄    52.  public void close(Reporter reporter) throws IOException {    53.           textOut.close();    54.           dataOut.close();    55.           crawlOut.close();    56.         }    57.       };    58.  }    59. }   public class ParseOutputFormat implements OutputFormat { //这里不是检查输出路径,是检查数据路径下的子路径,改变了接口中的定义 public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException { Path out = FileOutputFormat.getOutputPath(job); if (fs.exists(new Path(out, CrawlDatum.PARSE_DIR_NAME))) throw new IOException("Segment already parsed!"); } //下面获取了三个输入句柄,分别向三个路径中输出键值对 public RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progress)  throws IOException { ...... Path text = new Path(new Path(out, ParseText.DIR_NAME), name); // 一个输出路径  Path data = new Path(new Path(out, ParseData.DIR_NAME), name); //两个输出路径  Path crawl = new Path(new Path(out, CrawlDatum.PARSE_DIR_NAME), name);//三个输出路径   //一个写入 final MapFile.Writer textOut = new MapFile.Writer(job, fs, text.toString(), Text.class, ParseText.class, CompressionType.RECORD, progress); //第二个写入 final MapFile.Writer dataOut = new MapFile.Writer(job, fs, data.toString(), Text.class, ParseData.class, compType, progress); //第三个写入 final SequenceFile.Writer crawlOut = SequenceFile.createWriter(fs, job, crawl, Text.class, CrawlDatum.class, compType, progress); return new RecordWriter() { public void write(Text key, Parse parse)throws IOException { ...... crawlOut.append(key, d); ....... crawlOut.append(new Text(newUrl), newDatum); ...... crawlOut.append(key, adjust); ...... dataOut.append(key, parseData); ...... crawlOut.append(key, datum); } } //关闭三个句柄 public void close(Reporter reporter) throws IOException { textOut.close(); dataOut.close(); crawlOut.close(); } }; } }    ParseOutputFormat实现了OutputFormat接口,改变了job中设置的输出路径,并且把不同的内容输出到不同的路径,从而达到了多个输出(并且根据逻辑划分)。这个我觉得值得借鉴。      关于输入以及输入输出的各个实现都有什么用处,以后有机会再来写写。本人现在还是一知半解,见笑了。

下载文档,方便阅读与编辑

文档的实际排版效果,会与网站的显示效果略有不同!!

需要 15 金币 [ 分享文档获得金币 ]
4 人已下载

下载文档

相关文档