Storm 入门指南

bobozhuli

贡献于2014-06-07

字数:0 关键词: 分布式/云计算/大数据

Storm 入门指南 整理:毛祥溢 Email:maoxiangyi@jd.com 目录 第一章 大数据实时计算框架 ......................................................................................................... 4 1.1 实时计算的概念 ........................................................................................................... 4 1 数据源是实时的不间断的,要求对用户的响应时间也是实时的 ........................... 4 2 数据量大且无法或没必要预算,但要求对用户的响应时间是实时的。 ............... 4 1.2 实时计算相关技术 ....................................................................................................... 4 1 数据实时采集 ............................................................................................................... 5 2 数据实时计算 ............................................................................................................... 5 3 实时查询服务 ............................................................................................................... 5 4 总结 ............................................................................................................................... 5 1.3 早期产品 ....................................................................................................................... 5 1 IBM 的 StreamBase ........................................................................................................ 5 2 Borealis ........................................................................................................................... 6 1.4 近期产品 ....................................................................................................................... 6 1 Yahoo 的 S4 .................................................................................................................... 6 2 Twitter 实时计算 ........................................................................................................... 6 3 Facebook 的 Puma ........................................................................................................ 6 4 淘宝的实时计算、流式处理 ........................................................................................ 7 5 其它的实时计算系统 .................................................................................................... 7 第二章 Storm 框架概述 .................................................................................................................. 7 2.1 概述 ............................................................................................................................... 7 2.2 Storm ............................................................................................................................. 9 1 特性 ............................................................................................................................... 9 2 Storm 组件 ..................................................................................................................... 9 3 Zookeeper....................................................................................................................... 9 4 编程模型 spout、Bolt、Stream Groupings .............................................................. 10 第三章 Storm 伪分布式环境搭建(Redhat)............................................................................ 10 3.1 yum 配置安装 ........................................................................................................... 10 1 删除 redhat 原有的 yum ............................................................................................ 11 2 下载 yum 安装文件 .................................................................................................... 11 3 进行安装 yum ............................................................................................................. 11 4 配置网易源 ................................................................................................................. 11 5 初始化 ......................................................................................................................... 12 6 测试 ............................................................................................................................. 13 7 yum 资料 ...................................................................................................................... 13 3.2 搭建 Zookeeper 集群................................................................................................ 13 1 搭建 Zookeeper 集群.................................................................................................. 13 2 安装 Storm 依赖库 ..................................................................................................... 14 3 下载并解压 Storm 发布版本 ..................................................................................... 15 4 修改 storm.yaml 配置文件 ......................................................................................... 15 5 启动 Storm 各个后台进程 ......................................................................................... 16 6 向集群提交任务 ......................................................................................................... 16 7 提示 ............................................................................................................................. 17 3.3 软件列表 ..................................................................................................................... 17 1 目录 ............................................................................................................................. 17 2 软件包 .......................................................................................................................... 18 第四章 Storm Demo 编写 ............................................................................................................. 18 4.1 开发环境搭建 ........................................................................................................... 18 1 安装 eclipse ................................................................................................................ 18 2 安装 maven ................................................................................................................. 18 3 安装 maven2eclipse .................................................................................................... 18 4 pom.xml ....................................................................................................................... 18 4.2 wordcounter 单词计数器 ......................................................................................... 18 1 拓扑驱动类 ................................................................................................................. 18 2 WordReader 读取数据源 ......................................................................................... 22 3 WordNormalizer 切割句子成单词 ....................................................................... 25 4 WordCounter 计数器 ............................................................................................. 26 5 拓扑流程日志 ............................................................................................................. 28 4.3 speed 车辆速度监控 ................................................................ 错误!未定义书签。 第一章 大数据实时计算框架 1.1 实时计算的概念 互联网领域的实时计算一般都是针对海量数据进行的,除了像非实时计算的需求(如计 算结果准确)以外,实时计算最重要的一个需求是能够实时响应计算结果,一般要求为秒级。 个人理解,互联网行业的实时计算可以分为以下两种应用场景: 1 数据源是实时的不间断的,要求对用户的响应时间也是实时的 主要用于互联网流式数据处理。所谓流式数据是指将数据看作是数据流的形式来处理。 数据流则是在时间分布和数量上无限的一系列数据记录的集合体;数据记录是数据流的最小 组成单元。举个例子,对于大型网站,活跃的流式数据非常常见,这些数据包括网站的访问 PV/UV、用户访问了什么内容,搜索了什么内容等。实时的数据计算和分析可以动态实时地 刷新用户访问数据,展示网站实时流量的变化情况,分析每天各小时的流量和用户分布情况, 这对于大型网站来说具有重要的实际意义。 2 数据量大且无法或没必要预算,但要求对用户的响应时间是实时的。 主要用于特定场合下的数据分析处理。当数据量很大,同时发现无法穷举所有可能条件 的查询组合或者大量穷举出来的条件组合无用的时候,实时计算就可以发挥作用,将计算过 程推迟到查询阶段进行,但需要为用户提供实时响应。 1.2 实时计算相关技术 互联网上海量数据(一般为日志流)的实时计算过程可以被划分为以下三个阶段:数据 的产生与收集阶段、传输与分析处理阶段、存储对对外提供服务阶段。 下面分别进行简单的介绍: 1 数据实时采集 需求:功能上保证可以完整的收集到所有日志数据,为实时应用提供实时数据;响应时 间上要保证实时性、低延迟在 1 秒左右;配置简单,部署容易;系统稳定可靠等。 目前,互联网企业的海量数据采集工具,有 Facebook 开源的 Scribe、LinkedIn 开源的 Kafka、 Cloudera 开源的 Flume,淘宝开源的 TimeTunnel、Hadoop 的 Chukwa 等,均可以满足每秒数 百 MB 的日志数据采集和传输需求。 2 数据实时计算 传统的数据操作,首先将数据采集并存储在 DBMS 中,然后通过 query 和 DBMS 进行交 互,得到用户想要的答案。整个过程中,用户是主动的,而 DBMS 系统是被动的。但是,对 于现在大量存在的实时数据,比如股票交易的数据,这类数据实时性强,数据量大,没有止 境,传统的架构并不合适。流计算就是专门针对这种数据类型准备的。在流数据不断变化的 运动过程中实时地进行分析,捕捉到可能对用户有用的信息,并把结果发送出去。整个过程 中,数据分析处理系统是主动的,而用户却是处于被动接收的状态。 需求:适应流式数据、不间断查询;系统稳定可靠、可扩展性好、可维护性好等。 实时流计算框架:Yahoo 开源的 S4、Twitter 开源的 Storm,还有 Esper,Streambase, HStreaming 等。有关计算的一些注意点:分布式计算,并行计算(节点间的并行、节点内的 并行),热点数据的缓存策略,服务端计算。 3 实时查询服务 全内存:直接提供数据读取服务,定期 dump 到磁盘或数据库进行持久化。 半内存:使用 Redis、Memcache、MongoDB、BerkeleyDB 等内存数据库提供数据实时查 询服务,由这些系统进行持久化操作。 全磁盘:使用 HBase 等以分布式文件系统(HDFS)为基础的 NoSQL 数据库,对于 key-value 引擎,关键是设计好 key 的分布。 4 总结 A:并不是任何应用都做到实时计算才是最好的。 B:使用哪些技术和框架来搭建实时计算系统,需要根据实际业务需求进行选择。 C:对于分布式系统来说,系统的可配置性、可维护性、可扩展性十分重要,系统调优 永无止境。 1.3 早期产品 1 IBM 的 StreamBase StreamBase 是 IBM 开发的一款商业流式计算系统,在金融行业和政府部门使用 官方网站:http://www.streambase.com 2 Borealis Brandeis University、Brown University 和 MIT 合作开发的一个分布式流式系统,由之前 的流式系统 Aurora、Medusa 演化而来,学术研究的一个产品,08 年已经停止维护 1.4 近期产品 1 Yahoo 的 S4 S4 是一个通用的、分布式的、可扩展的、分区容错的、可插拔的流式系统,Yahoo! 开发 S4 系统,主要是为了解决:搜索广告的展现、处理用户的点击反馈。 官方网站:http://s4.io/ S4 简介:http://www.programmer.com.cn/5304/ 2 Twitter 实时计算 2.1 Twitter 的 storm:Storm 是一个分布式的、容错的实时计算系统 Storm 用途:可用于处理消息和更新数据库(流处理),在数据流上进行持续查询,并以流 的形式返回结果到客户端(持续计算),并行化一个类似实时查询的热点查询(分布式的 RPC)。 官方指南:https://storm.canonical.com/Tutorial github 下载:https://github.com/nathanmarz/storm/downloads storm 详解:http://duanple.blog.163.com/blog/static/7097176720111020102057795/ storm 配置详解:http://www.tbdata.org/archives/2118 storm 翻译及总结博客:http://chenlx.blog.51cto.com/4096635/d-1/p-1 2.2. Twitter 的 Rainbird:Rainbird 是一款分布式实时统计系统, Rainbird 可以用于实时数 据的统计:(1)统计网站中每一个页面,域名的点击次数,(2)内部系统的运行监控(统计被监 控服务器的运行状态),(3) 记录最大值和最小值。 官方简介: http://www.slideshare.net/kevinweil/rainbird-realtime-analytics-at-twitter-strata-2011 中文介绍: http://www.cnblogs.com/gpcuster/archive/2011/02/06/1949466.html 3 Facebook 的 Puma facebook 使用 puma 和 Habase 相结合来处理实时数据,另外 facebook 发表一篇利用 HBase/Hadoop 进行实时数据处理的论文(Apache Hadoop Goes Realtime at Facebook),通过 一些实时性改造,让批处理计算平台也具备实时计算的能力。 4 淘宝的实时计算、流式处理 4.1 银河流数据处理平台:通用的流数据实时计算系统,以实时数据产出的低延迟、高吞 吐和复用性为初衷和目标,采用 actor 模型构建分布式流数据计算框架(底层基于 akka), 功能易扩展、部分容错、数据和状态可监控。 银河具有处理实时流数据(如 TimeTunnel 收 集的实时数据)和静态数据(如本地文件、HDFS 文件)的能力,能够提供灵活的实时数据 输出,并提供自定义的数据输出接口以便扩展实时计算能力。 银河目前主要是为魔方提供 实时的交易、浏览和搜索日志等数据的实时计算和分析。 4.2.基于 storm 的流式处理,统计计算、持续计算、实时消息处理。 4.3 利用 Habase 实现的 online 应用。 5 其它的实时计算系统 Hstreaming:官方网站:http://www.hstreaming.com/technology/hstreaming/ Esper:esper 可以用在股票系统、风险监控系统等等要求实时性比较高的系统中 官方网站:http://www.espertech.com/ 官网的英文简介:http://www.espertech.com/products/esper.php 中文简介:http://www.cnblogs.com/qlee/archive/2011/06/22/2086550.html 以上数据来源于:http://www.cnblogs.com/panfeng412/archive/2011/10/28/2227195.html 第二章 Storm 框架概述 2.1 概述 在过去的十年里,数据处理发生了革命性的变化。MapReduce,Hadoop,以及相关的 技术使我们可以存储和处理以前不可想象规模的数据。很遗憾,这些数据处理系统都不是实 时系统,命中注定也不是它们。根本没办法把 Hadoop 变成一个实时系统;实时数据处理和 批处理的许多要求在根本上有很大不同。 然而,企业对大规模实时数据处理要求越来越多。缺乏“实时 Hadoop”是数据处理生 态系统中最大的窘境。 Storm 解决了这个窘境。 Storm 之前,你通常必须手动建立一个由许多队列和许多 worker 组成的网络来实现实 时处理。worker 处理队列消息,更新数据库,发送新消息给其它队列以供后续处理。很遗 憾, 这种方法有很大的局限性: 乏味:你大部份开发时间花费在配置消息发送,部署 worker,部署中间队列。你关心 的实时处理逻辑对应到你的代码的比例相对较小 。 脆弱:没有多少容错。你负责保持每个 worker 和队列正常工作。 痛苦伸缩:当单个 worker 或队列的消息吞吐量太高时,你需要分区,即数据如何分散。 你需要重新配置其它 worker,让它们发送消息到新位置。这导致删除或添加部件都可能失 败。 虽然队列+workers 的范式能解决大量的消息,消息处理显然是实时计算的基本范式。问 题是:你要怎么做,才能在某种程度上保证数据不会丢失,对海量消息轻松扩容,并且使用 和运营工作都超级简单呢?Storm 满足这些目标。 Storm 公开(expose)一组实时计算原语。类似 MapReduce 极大地简化了编写并行批处 理程序,storm 的原语极大地简化了编写并行实时计算程序。 Storm 的关键特性: 用例非常广泛:Storm 可用于处理消息和更新数据库(流处理),在数据流上进行持续 查询,并以流的形式返回结果到客户端(持续计算),并行化一个类似实时查询的热点查询 (分布式的 RPC),还有更多的用例。Storm 的一组很小的原语满足了惊人数量的用例。 可伸缩:Storm 随时都可对大规模消息进行扩容。扩容一个拓扑,你只需要添加机器和 增加的拓扑结构的并行设置。看一个 storm 规模的例子,一个 storm 集群有 10 个节点,一 个最初的Storm应用每秒可以处理1,000,000个消息(指spout和bolt总共发射的消息总和), 拓扑的其中一部分每秒数有数百个数据库调用。Storm 使用 Zookeeper 协调集群,使其集群 可以扩容到非常大。 保证数据不丢失:实时系统必须对成功处理数据提供有力保证 。系统丢弃数据的用例 非常有限。Storm 保证每个消息都被处理,这直接与其它系统截然不同,如 S4。 非常健壮:Storm 与 Hadoop 不同,Hadoop 难于管理早已臭名昭著,Storm 集群只是干 活。使用户尽可能方便地管理 storm 集群是 storm 项目的一个明确目标。 容错:计算的执行过程中如果发生故障,Storm 将在必要时重新分配任务。Storm 确保计算 永远运行(或者直到你 kill 此计算) 。 编程语言无关性:健壮和可伸缩的实时处理不应仅限于一个单一的平台。Storm 的拓扑 结构和处理组件可以用任何语言定义,对任何人而言,Storm 都是易接受的。 更多信息请参考:http://www.maoxiangyi.cn/index.php/archives/337 2.2 Storm 1 特性 对比 Hadoop 的批处理,Storm 是个实时的、分布式以及具备高容错的计算系统。同 Hadoop 一样 Storm 也可以处理大批量的数据,然而 Storm 在保证高可靠性的前提下还可以 让处理进行的更加实时;也就是说,所有的信息都会被处理。Storm 同样还具备容错和分布计 算这些特性,这就让 Storm 可以扩展到不同的机器上进行大批量的数据处理。他同样还有以 下的这些特性: ·易于扩展。对于扩展,你只需要添加机器和改变对应的 topology(拓扑)设置。Storm 使用 Hadoop Zookeeper 进行集群协调,这样可以充分的保证大型集群的良好运行。 ·每条信息的处理都可以得到保证。 ·Storm 集群管理简易。 ·Storm 的容错机能:一旦 topology 递交,Storm 会一直运行它直到 topology 被废除或 者被关闭。而在执行中出现错误时,也会由 Storm 重新分配任务。 ·尽管通常使用 Java,Storm 中的 topology 可以用任何语言设计。 当然为了更好的理解文章,你首先需要安装和设置 Storm。需要通过以下几个简单的步 骤: ·从 Storm 官方下载 Storm 安装文件 ·将 bin/directory 解压到你的 PATH 上,并保证 bin/storm 脚本是可执行的。 2 Storm 组件 Storm 集群主要由一个主节点和一群工作节点(worker node)组成,通过 Zookeeper 进行 协调。 主节点: 主节点通常运行一个后台程序 —— Nimbus,用于响应分布在集群中的节点,分配任 务和监测故障。这个很类似于 Hadoop 中的 Job Tracker。 工作节点: 工作节点同样会运行一个后台程序 —— Supervisor,用于收听工作指派并基于要求运 行工作进程。每个工作节点都是 topology 中一个子集的实现。而 Nimbus 和 Supervisor 之间 的协调则通过 Zookeeper 系统或者集群。 3 Zookeeper Zookeeper 是完成 Supervisor 和 Nimbus 之间协调的服务。而应用程序实现实时的逻辑则 被封装进 Storm 中的“topology”。 topology 则是一组由 Spouts(数据源)和 Bolts(数据操作)通 过 Stream Groupings 进行连接的图。 4 编程模型 spout、Bolt、Stream Groupings Spout 简而言之,Spout 从来源处读取数据并放入 topology。Spout 分成可靠和不可靠两种;当 Storm 接收失败时,可靠的 Spout 会对 tuple(元组,数据项组成的列表)进行重发;而不可靠的 Spout 不会考虑接收成功与否只发射一次。而 Spout 中最主要的方法就是 nextTuple(),该方 法会发射一个新的 tuple 到 topology,如果没有新 tuple 发射则会简单的返回。 Bolt Topology 中所有的处理都由 Bolt 完成。Bolt 可以完成任何事,比如:连接的过滤、聚合、 访问文件/数据库、等等。Bolt 从 Spout 中接收数据并进行处理,如果遇到复杂流的处理也 可能将 tuple 发送给另一个 Bolt 进行处理。而 Bolt 中最重要的方法是 execute(),以新的 tuple 作为参数接收。不管是 Spout 还是 Bolt,如果将 tuple 发射成多个流,这些流都可以通过 declareStream()来声明。 Stream Groupings Stream Grouping 定义了一个流在 Bolt 任务间该如何被切分。这里有 Storm 提供的 6 个 Stream Grouping 类型: 1. 随机分组(Shuffle grouping):随机分发 tuple 到 Bolt 的任务,保证每个任务获得相等 数量的 tuple。 2. 字段分组(Fields grouping):根据指定字段分割数据流,并分组。例如,根据“user-id” 字段,相同“user-id”的元组总是分发到同一个任务,不同“user-id”的元组可能分发到不 同的任务。 3. 全部分组(All grouping):tuple 被复制到 bolt 的所有任务。这种类型需要谨慎使用。 4. 全局分组(Global grouping):全部流都分配到 bolt 的同一个任务。明确地说,是分配 给 ID 最小的那个 task。 5. 无分组(None grouping):你不需要关心流是如何分组。目前,无分组等效于随机分组。 但最终,Storm 将把无分组的 Bolts 放到 Bolts 或 Spouts 订阅它们的同一线程去执行(如果可 能)。 6. 直接分组(Direct grouping):这是一个特别的分组类型。元组生产者决定 tuple 由哪个 元组处理者任务接收。 当然还可以实现 CustomStreamGroupimg 接口来定制自己需要的分组。 第三章 Storm 伪分布式环境搭建(Redhat) 3.1 yum 配置安装 在配置 storm 集群时需要安装很多依赖包,手动安装的话比较费时费力,还不一定能安 装成功。建议使用 yum 进行安装,先配置下 Yum。 由于 redhat 的 yum 在线更新是收费的,如果没有注册的话不能使用,如果要使用,需 将 redhat 的 yum 卸载后,重启安装,再配置其他源 . 第 三 方 源 包 括 : 网 易 , epel,repoforge ,rpmfusion 以下为详细过程: 1 删除 redhat 原有的 yum rpm -aq|grep yum|xargs rpm -e --nodeps 2 下载 yum 安装文件 (以下路径需要和 redhat 的版本对应) 注意,如果下载时找不到文件,就登录到:http://mirrors.163.com/centos/6/os/x86_64/ 上 查找相应的文件。然后再下载。 Wget http://mirrors.163.com/centos/6/os/x86_64/Packages/yum-3.2.27-14.el6.centos.noarch.rpm wget http://mirrors.163.com/centos/6/os/x86_64/Packages/yum-metadata-parser-1.1.2-14.1.el6.x86_ 64.rpm wget http://mirrors.163.com/centos/6/os/x86_64/Packages/yum-plugin-fastestmirror-1.1.26-11.el6.n oarch.rpm wget http://mirrors.163.com/centos/6/os/x86_64/Packages/python-iniparse-0.3.1-2.1.el6.noarch.rpm 3 进行安装 yum rpm -ivh python-iniparse-0.3.1-2.1.el6.noarch.rpm rpm -ivh yum-metadata-parser-1.1.2-14.1.el6.x86_64.rpm rpm -ivh yum-3.2.27-14.el6.centos.noarch.rpm um-plugin-fastestmirror-1.1.26-11.el6.noarch.rpm 注意最后两个包必需同时安装,否则会相互依赖 ----------配置网易源 4 配置网易源 mv /etc/yum.repos.d/rhel-debuginfo.repo /etc/yum.repos.d/rhel-debuginfo.repo.repo.bak vi /etc/yum.repos.d/rhel-debuginfo.repo 内容为:(url 的地址要根据不同的系统进行改变) [base] name=CentOS-$releasever - Base baseurl=http://mirrors.163.com/centos/6.0/os/$basearch/ gpgcheck=1 gpgkey=http://mirrors.163.com/centos/RPM-GPG-KEY-CentOS-6 #released updates [updates] name=CentOS-$releasever - Updates baseurl=http://mirrors.163.com/centos/6.0/updates/$basearch/ gpgcheck=1 gpgkey=http://mirrors.163.com/centos/RPM-GPG-KEY-CentOS-6 #packages used/produced in the build but not released #[addons] #name=CentOS-$releasever - Addons #baseurl=http://mirrors.163.com/centos/$releasever/addons/$basearch/ #gpgcheck=1 #gpgkey=http://mirrors.163.com/centos/RPM-GPG-KEY-CentOS-6 #additional packages that may be useful [extras] name=CentOS-$releasever - Extras baseurl=http://mirrors.163.com/centos/6.0/extras/$basearch/ gpgcheck=1 gpgkey=http://mirrors.163.com/centos/RPM-GPG-KEY-CentOS-6 #additional packages that extend functionality of existing packages [centosplus] name=CentOS-$releasever - Plus baseurl=http://mirrors.163.com/centos/6.0/centosplus/$basearch/ gpgcheck=1 enabled=0 5 初始化 yum clean all 6 测试 yum install vim #测试一下可不可以用 7 yum 资料 以上配置节选于:http://space.itpub.net/25313300/viewspace-708509 3.2 搭建 Zookeeper 集群 1 搭建 Zookeeper 集群 Storm 使用 Zookeeper 协调集群,由于 Zookeeper 并不用于消息传递,所以 Storm 给 Zookeeper 带来的压力相当低。大多数情况下,单个节点的 Zookeeper 集群足够胜任,不过 为了确保故障恢复或者部署大规模 Storm 集群,可能需要更大规模节点的 Zookeeper 集群(对 于 Zookeeper 集群的话,官方推荐的最小节点数为 3 个)。 在 Zookeeper 集群的每台机器上完成以下安装部署步骤: 1. 下载安装 Java JDK,官方下载链接为 http://java.sun.com/javase/downloads/index.jsp,JDK 版本为 JDK 6 或以上。 2. 根据 Zookeeper 集群的负载情况,合理设置 Java 堆大小,尽可能避免发生 swap,导致 Zookeeper 性能下降。保守起见,4GB 内存的机器可以为 Zookeeper 分配 3GB 最大堆空间。 3. 下载后解压安装 Zookeeper 包,官方下载链接为 http://hadoop.apache.org/zookeeper/releases.html。 4. 根据 Zookeeper 集群节点情况,在 conf 目录下创建 Zookeeper 配置文件 zoo.cfg: tickTime=2000 dataDir=/var/zookeeper/ clientPort=2181 initLimit=5 syncLimit=2 server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888 其中,dataDir 指定 Zookeeper 的数据文件目录;其中 server.id=host:port:port,id 是为每个 Zookeeper 节点的编号,保存在 dataDir 目录下的 myid 文件中,zoo1~zoo3 表示各个 Zookeeper 节点的 hostname,第一个 port 是用于连接 leader 的端口,第二个 port 是用于 leader 选举的 端口。 5. 在 dataDir 目录下创建 myid 文件,文件中只包含一行,且内容为该节点对应的 server.id 中的 id 编号。 6. 启动 Zookeeper 服务: Java -cp zookeeper.jar:lib/log4j-1.2.15.jar:conf \ org.apache.zookeeper.server.quorum.QuorumPeerMain zoo.cfg 或者 bin/zkServer.sh start 7. 通过 Zookeeper 客户端测试服务是否可用: Java -cp zookeeper.jar:src/java/lib/log4j-1.2.15.jar:conf:src/java/lib/jline-0.9.94.jar \ org.apache.zookeeper.ZooKeeperMain -server 127.0.0.1:2181 或者 bin/zkCli.sh -server 127.0.0.1:2181 注意事项: 由于 Zookeeper 是快速失败(fail-fast)的,且遇到任何错误情况,进程均会退出,因此,最 好能通过监控程序将 Zookeeper 管理起来,保证 Zookeeper 退出后能被自动重启。详情参考 这里。 Zookeeper 运行过程中会在 dataDir 目录下生成很多日志和快照文件,而 Zookeeper 运行进程 并不负责定期清理合并这些文件,导致占用大量磁盘空间,因此,需要通过 cron 等方式定 期清除没用的日志和快照文件。详情参考这里。具体命令格式如下:java -cp zookeeper.jar:log4j.jar:conf org.apache.zookeeper.server.PurgeTxnLog -n 2 安装 Storm 依赖库 1 安装 ZMQ 2.1.7 请勿使用 2.1.10 版本,因为该版本的一些严重 bug 会导致 Storm 集群运 行时出现奇怪的问题。少数用户在 2.1.7 版本会遇到”IllegalArgumentException”的异常,此 时降为 2.1.4 版本可修复这一问题。 下载后编译安装 ZMQ: wget http://download.zeromq.org/zeromq-2.1.7.tar.gz tar -xzf zeromq-2.1.7.tar.gz cd zeromq-2.1.7 ./configure make sudo make install 注意事项: 如果安装过程报错 uuid 找不到,则通过如下的包安装 uuid 库: sudo yum install e2fsprogsl -b current sudo yum install e2fsprogs-devel -b current 2 安装 JZMQ 下载后编译安装 JZMQ: git clone https://github.com/nathanmarz/jzmq.git cd jzmq ./autogen.sh ./configure make sudo make install 为了保证 JZMQ 正常工作,可能需要完成以下配置: 正确设置 JAVA_HOME 环境变量 安装 Java 开发包 升级 autoconf 3 安装 Java 6 4 安装 Python2.6.6 1. 下载 Python2.6.6: wget http://www.python.org/ftp/python/2.6.6/Python-2.6.6.tar.bz2 2. 编译安装 Python2.6.6: tar –jxvf Python-2.6.6.tar.bz2 cd Python-2.6.6 ./configure make make install 3. 测试 Python2.6.6 python -V Python 2.6.6 5 安装 unzip 1. 如果使用 RedHat 系列 Linux 系统,执行以下命令安装 unzip: apt-get install unzip 2. 如果使用 Debian 系列 Linux 系统,执行以下命令安装 unzip: yum install unzip 3 下载并解压 Storm 发布版本 1. 下载 Storm 发行版本,推荐使用 Storm0.8.1: wget https://github.com/downloads/nathanmarz/storm/storm-0.8.1.zip 2. 解压到安装目录下: unzip storm-0.8.1.zip 4 修改 storm.yaml 配置文件 Storm 发行版本解压目录下有一个 conf/storm.yaml 文件,用于配置 Storm。默认配置在 这里可以查看。conf/storm.yaml 中的配置选项将覆盖 defaults.yaml 中的默认配置。以下配 置选项是必须在 conf/storm.yaml 中进行配置的: 1) storm.zookeeper.servers: Storm 集群使用的 Zookeeper 集群地址,其格式如下: storm.zookeeper.servers: - “111.222.333.444″ - “555.666.777.888″ 如果 Zookeeper 集群使用的不是默认端口,那么还需要 storm.zookeeper.port 选项。 2) storm.local.dir: Nimbus 和 Supervisor 进程用于存储少量状态,如 jars、confs 等的本 地磁盘目录,需要提前创建该目录并给以足够的访问权限。然后在 storm.yaml 中配置 该目录,如: storm.local.dir: "/home/admin/storm/workdir" 3) java.library.path: Storm 使用的本地库(ZMQ 和 JZMQ)加载路径,默认为” /usr/local/lib:/opt/local/lib:/usr/lib”,一般来说 ZMQ 和 JZMQ 默认安装在/usr/local/lib 下,因此不需要配置即可。 4) nimbus.host: Storm 集群 Nimbus 机器地址,各个 Supervisor 工作节点需要知道哪个 机器是 Nimbus,以便下载 Topologies 的 jars、confs 等文件,如: nimbus.host: "111.222.333.444" 5) supervisor.slots.ports: 对于每个 Supervisor 工作节点,需要配置该工作节点可以运 行的 worker 数量。每个 worker 占用一个单独的端口用于接收消息,该配置选项即用 于定义哪些端口是可被 worker 使用的。默认情况下,每个节点上可运行 4 个 workers, 分别在 6700、6701、6702 和 6703 端口,如: supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 5 启动 Storm 各个后台进程 最后一步,启动 Storm 的所有后台进程。和 Zookeeper 一样,Storm 也是快速失败(fail-fast) 的系统,这样 Storm 才能在任意时刻被停止,并且当进程重启后被正确地恢复执行。这也 是为什么 Storm 不在进程内保存状态的原因,即使 Nimbus 或 Supervisors 被重启,运行中 的 Topologies 不会受到影响。 以下是启动 Storm 各个后台进程的方式: Nimbus: 在 Storm 主控节点上运行”bin/storm nimbus >/dev/null 2>&1 &”启动 Nimbus 后 台程序,并放到后台执行; Supervisor: 在 Storm 各个工作节点上运行”bin/storm supervisor >/dev/null 2>&1 &”启动 Supervisor 后台程序,并放到后台执行; UI: 在 Storm 主控节点上运行”bin/storm ui >/dev/null 2>&1 &”启动 UI 后台程序,并放到 后台执行,启动后可以通过 http://{nimbus host}:8080 观察集群的 worker 资源使用情况、 Topologies 的运行状态等信息。 注意事项: 启动 Storm 后台进程时,需要对 conf/storm.yaml 配置文件中设置的 storm.local.dir 目录具 有写权限。 Storm 后台进程被启动后,将在 Storm 安装部署目录下的 logs/子目录下生成各个进程的日 志文件。 经测试,Storm UI 必须和 Storm Nimbus 部署在同一台机器上,否则 UI 无法正常工作,因 为 UI 进程会检查本机是否存在 Nimbus 链接。 为了方便使用,可以将 bin/storm 加入到系统环境变量中。 至此,Storm 集群已经部署、配置完毕,可以向集群提交拓扑运行了。 6 向集群提交任务 1. 启动 Storm Topology: storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3 其中,allmycode.jar 是包含 Topology 实现代码的 jar 包,org.me.MyTopology 的 main 方法是 Topology 的入口,arg1、arg2 和 arg3 为 org.me.MyTopology 执行时需要传入的参数。 2. 停止 Storm Topology: storm kill {toponame} 其中,{toponame}为 Topology 提交到 Storm 集群时指定的 Topology 任务名称。 7 提示 以上资源来自于: http://blog.linezing.com/category/storm-quick-start?spm=0.0.0.0.w2ow2L 在安装的过程中,会提示安装 gcc 等等库文件,使用 yum 命令依次进行安装。在解决 依赖库的过程中,建议参考:http://blog.sina.com.cn/s/blog_546abd9f0101cce8.html 3.3 软件列表 1 目录 2 软件包 第四章 Storm Demo 编写 4.1 开发环境搭建 1 安装 eclipse 2 安装 maven 3 安装 maven2eclipse 4 pom.xml 4.2 wordcounter 单词计数器 1 拓扑驱动类 package cn.jd.storm; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; /** * 功能说明: * 设计一个 topology,来实现对一个句子里面的单词出现的频率进行统计。 * 整个 topology 分为三个部分: * WordReader:数据源,负责发送单行文本记录(句子) * WordNormalizer:负责将单行文本记录(句子)切分成单词 * WordCounter:负责对单词的频率进行累加 * * @author 毛祥溢 * Email:frank@maoxiangyi.cn * 2013-8-26 下午 5:59:06 */ public class TopologyMain { /** * @param args 文件路径 */ public static void main(String[] args)throws Exception { // Storm 框架支持多语言,在 JAVA 环境下创建一个拓扑,需 要使用 TopologyBuilder 进行构建 TopologyBuilder builder = new TopologyBuilder(); /* WordReader 类,主要是将文本内容读成一行一行的模式 * 消息源 spout 是 Storm 里面一个 topology 里面的消息生产者。 * 一般来说消息源会从一个外部源读取数据并且向 topology 里面发出 消息:tuple。 * Spout 可以是可靠的也可以是不可靠的。 * 如果这个 tuple 没有被 storm 成功处理,可靠的消息源 spouts 可 以重新发射一个 tuple,但是不可靠的消息源 spouts 一旦发出一个 tuple 就不 能重发了。 * * 消息源可以发射多条消息流 stream。多条消息流可以理解为多中类型 的数据。 * 使用 OutputFieldsDeclarer.declareStream 来定义多个 stream,然后使用 SpoutOutputCollector 来发射指定的 stream。 * * Spout 类里面最重要的方法是 nextTuple。要么发射一个新的 tuple 到 topology 里面或者简单的返回如果已经没有新的 tuple。 * 要注意的是 nextTuple 方法不能阻塞,因为 storm 在同一个线程上 面调用所有消息源 spout 的方法。 * * 另外两个比较重要的 spout 方法是 ack 和 fail。storm 在检测到一 个 tuple 被整个 topology 成功处理的时候调用 ack,否则调用 fail。storm 只对可靠的 spout 调用 ack 和 fail。 */ builder.setSpout("word-reader",new WordReader()); /* WordNormalizer 类,主要是将一行一行的文本内容切割成单词 * * 所有的消息处理逻辑被封装在 bolts 里面。Bolts 可以做很多事情: 过滤,聚合,查询数据库等等。 * Bolts 可以简单的做消息流的传递。复杂的消息流处理往往需要很多 步骤,从而也就需要经过很多 bolts。 * 比如算出一堆图片里面被转发最多的图片就至少需要两步: * 第一步算出每个图片的转发数量。 * 第二步找出转发最多的前 10 个图片。(如果要把这个过程做得 更具有扩展性那么可能需要更多的步骤)。 * * Bolts 可以发射多条消息流, 使用 OutputFieldsDeclarer.declareStream 定义 stream,使用 OutputCollector.emit 来选择要发射的 stream。 * Bolts 的主要方法是 execute, 它以一个 tuple 作为输入,bolts 使用 OutputCollector 来发射 tuple。 * bolts 必须要为它处理的每一个 tuple 调用 OutputCollector 的 ack 方法,以通知 Storm 这个 tuple 被处理完成了,从而通知这个 tuple 的发射 者 spouts。 * 一般的流程是: bolts 处理一个输入 tuple, 发射 0 个或者多个 tuple, 然后调用 ack 通知 storm 自己已经处理过这个 tuple 了。storm 提供 了一个 IBasicBolt 会自动调用 ack。 * * */ builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader"); /* * 上面的代码和下面的代码中都设定了数据分配的策略 stream grouping * 定义一个 topology 的其中一步是定义每个 bolt 接收什么样的流作 为输入。stream grouping 就是用来定义一个 stream 应该如果分配数据给 bolts 上面的多个 tasks。 * Storm 里面有 7 种类型的 stream grouping * Shuffle Grouping: 随机分组, 随机派发 stream 里面的 tuple,保证每个 bolt 接收到的 tuple 数目大致相同。 * Fields Grouping:按字段分组, 比如按 userid 来分组, 具有同样 userid 的 tuple 会被分到相同的 Bolts 里的一个 task, * 而不同的 userid 则会被分配到不同的 bolts 里的 task。 * All Grouping:广播发送,对于每一个 tuple,所有 的 bolts 都会收到。 * Global Grouping:全局分组, 这个 tuple 被分配 到 storm 中的一个 bolt 的其中一个 task。再具体一点就是分配给 id 值最低的 那个 task。 * Non Grouping:不分组,这 stream grouping 个 分组的意思是说 stream 不关心到底谁会收到它的 tuple。 * 目前这种分组和 Shuffle grouping 是一样 的效果, 有一点不同的是 storm 会把这个 bolt 放到这个 bolt 的订阅者同一个 线程里面去执行。 * Direct Grouping: 直接分组, 这是一种比较特别 的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个 task 处理 这个消息。 * 只有被声明为 Direct Stream 的消息流可以 声明这种分组方法。而且这种消息 tuple 必须使用 emitDirect 方法来发射。 * 消息处理者可以通过 TopologyContext 来 获取处理它的消息的 task 的 id (OutputCollector.emit 方法也会返回 task 的 id)。 * Local or shuffle grouping:如果目标 bolt 有 一个或者多个 task 在同一个工作进程中,tuple 将会被随机发生给这些 tasks。 * 否则,和普通的 Shuffle Grouping 行为一 致。 * */ builder.setBolt("word-counter", new WordCounter(),1).fieldsGrouping("word-normalizer", new Fields("word")); /* * storm 的运行有两种模式: 本地模式和分布式模式. * 1) 本地模式: * storm 用一个进程里面的线程来模拟所有的 spout 和 bolt. 本地模式对开发和测试来说比较有用。 * 你运行storm-starter里面的topology的时候它们 就是以本地模式运行的, 你可以看到 topology 里面的每一个组件在发射什么消 息。 * 2) 分布式模式: * storm 由一堆机器组成。当你提交 topology 给 master 的时候, 你同时也把 topology 的代码提交了。 * master 负责分发你的代码并且负责给你的 topolgoy 分配工作进程。如果一个工作进程挂掉了, master 节点会把认为重新分配到其它 节点。 * 下面是以本地模式运行的代码: * * Conf 对象可以配置很多东西, 下面两个是最常见的: * TOPOLOGY_WORKERS(setNumWorkers) 定义你希望 集群分配多少个工作进程给你来执行这个 topology. * topology 里面的每个组件会被需要线程来执 行。每个组件到底用多少个线程是通过 setBolt 和 setSpout 来指定的。 * 这些线程都运行在工作进程里面. 每一个工作 进程包含一些节点的一些工作线程。 * 比如, 如果你指定 300 个线程,60 个进程, 那么每个工作进程里面要执行 6 个线程, 而这 6 个线程可能属于不同的组件 (Spout, Bolt)。 * 你可以通过调整每个组件的并行度以及这些线 程所在的进程数量来调整 topology 的性能。 * TOPOLOGY_DEBUG(setDebug), 当它被设置成 true 的话, storm 会记录下每个组件所发射的每条消息。 * 这在本地环境调试 topology 很有用, 但是 在线上这么做的话会影响性能的。 */ Config conf = new Config(); conf.setDebug(true); conf.setNumWorkers(2); conf.put("wordsFile","/root/workspace1/com.jd.storm.demo/src /main/resources/words.txt"); conf.setDebug(true); conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); /* * 定义一个 LocalCluster 对象来定义一个进程内的集群。提交 topology 给这个虚拟的集群和提交 topology 给分布式集群是一样的。 * 通过调用 submitTopology 方法来提交 topology, 它接受三个参 数:要运行的 topology 的名字,一个配置对象以及要运行的 topology 本身。 * topology 的名字是用来唯一区别一个 topology 的,这样你然后可 以用这个名字来杀死这个 topology 的。前面已经说过了, 你必须显式的杀掉一 个 topology, 否则它会一直运行。 */ LocalCluster cluster = new LocalCluster(); cluster.submitTopology("wordCounterTopology", conf, builder.createTopology()); Thread.sleep(1000); cluster.killTopology("wordCounterTopology"); cluster.shutdown(); } } 2 WordReader 读取数据源 package cn.jd.storm; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; /** * * 功能说明: * 主要是将文件内容读出来,一行一行 * * Spout 类里面最重要的方法是 nextTuple。 * 要么发射一个新的 tuple 到 topology 里面或者简单的返回如果已经没 有新的 tuple。 * 要注意的是 nextTuple 方法不能阻塞,因为 storm 在同一个线程上面调 用所有消息源 spout 的方法。 * 另外两个比较重要的 spout 方法是 ack 和 fail。 * storm 在检测到一个 tuple 被整个 topology 成功处理的时候调用 ack, 否则调用 fail。 * storm 只对可靠的 spout 调用 ack 和 fail。 * * @author 毛祥溢 * Email:frank@maoxiangyi.cn * 2013-8-26 下午 6:05:46 */ public class WordReader extends BaseRichSpout { private SpoutOutputCollector collector; private FileReader fileReader; private String filePath; private boolean completed = false; //storm 在检测到一个 tuple 被整个 topology 成功处理的时候调用 ack, 否则调用 fail。 public void ack(Object msgId) { System.out.println("OK:"+msgId); } public void close() {} //storm 在检测到一个 tuple 被整个 topology 成功处理的时候调用 ack, 否则调用 fail。 public void fail(Object msgId) { System.out.println("FAIL:"+msgId); } /* * 在 SpoutTracker 类中被调用,每调用一次就可以向 storm 集群中发射一 条数据(一个 tuple 元组),该方法会被不停的调用 */ public void nextTuple() { if(completed){ try { Thread.sleep(1000); } catch (InterruptedException e) { } return; } String str; BufferedReader reader =new BufferedReader(fileReader); try{ while((str = reader.readLine()) != null){ System.out.println("WordReader 类 读取到一行数据: "+ str); this.collector.emit(new Values(str),str); System.out.println("WordReader 类 发射了一条数据: "+ str); } }catch(Exception e){ throw new RuntimeException("Error reading tuple",e); }finally{ completed = true; } } public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) { try { this.fileReader = new FileReader(conf.get("wordsFile").toString()); } catch (FileNotFoundException e) { throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]"); } this.filePath = conf.get("wordsFile").toString(); this.collector = collector; } /** * 定义字段 id,该 id 在简单模式下没有用处,但在按照字段分组的模式下 有很大的用处。 * 该 declarer 变量有很大作用,我们还可以调用 declarer.declareStream();来定义 stramId,该 id 可以用来定义更加复杂 的流拓扑结构 */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line")); } } 3 WordNormalizer 切割句子成单词 package cn.jd.storm; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * * 功能说明: * 将一行文本切割成单词,并封装 collector 中发射出去 * * @author 毛祥溢 * Email:frank@maoxiangyi.cn * 2013-8-26 下午 6:05:59 */ public class WordNormalizer extends BaseBasicBolt { public void cleanup() { System.out.println("将一行文本切割成单词,并封装 collector 中发射出去 ---完毕!"); } /** * 接受的参数是 WordReader 发出的句子,即 input 的内容是句子 * execute 方法,将句子切割形成的单词发出 */ public void execute(Tuple input, BasicOutputCollector collector) { String sentence = input.getString(0); String[] words = sentence.split(" "); System.out.println("WordNormalizer 类 收到一条数据,这条数 据是: "+ sentence); for(String word : words){ word = word.trim(); if(!word.isEmpty()){ word = word.toLowerCase(); System.out.println("WordNormalizer 类 收到一条数 据,这条数据是: "+ sentence+"数据正在被切割,切割出来的单词是 "+ word); collector.emit(new Values(word)); } } } /** * 定义字段 id,该 id 在简单模式下没有用处,但在按照字段分组的模式下有 很大的用处。 * 该 declarer 变量有很大作用,我们还可以调用 declarer.declareStream();来定义 stramId,该 id 可以用来定义更加复杂 的流拓扑结构 */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } 4 WordCounter 计数器 package cn.jd.storm; import java.util.HashMap; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; /** * * 功能说明: * 实现计数器的功能,第一次将 collector 中的元素存放在成员变量 counters(Map)中. * 如果 counters(Map)中已经存在该元素,getValule 并对 Value 进 行累加操作。 * * @author 毛祥溢 * Email:frank@maoxiangyi.cn * 2013-8-26 下午 6:06:07 */ public class WordCounter extends BaseBasicBolt { private static final long serialVersionUID = 5678586644899822142L; Integer id; String name; //定义 Map 封装最后的结果 Map counters; /** * 在 spout 结束时被调用,将最后的结果显示出来 * * 結果: * -- Word Counter [word-counter-2] -- * really: 1 * but: 1 * application: 1 * is: 2 * great: 2 */ @Override public void cleanup() { System.out.println("-- Word Counter ["+name+"-"+id+"] --"); for(Map.Entry entry : counters.entrySet()){ System.out.println(entry.getKey()+": "+entry.getValue()); } System.out.println("实现计数器的功能 --完畢!"); } /** * 初始化操作 */ @Override public void prepare(Map stormConf, TopologyContext context) { this.counters = new HashMap(); this.name = context.getThisComponentId(); this.id = context.getThisTaskId(); } public void declareOutputFields(OutputFieldsDeclarer declarer) {} /** * 实现计数器的功能,第一次将 collector 中的元素存放在成员变量 counters(Map)中. * 如果 counters(Map)中已经存在该元素,getValule 并对 Value 进行 累加操作。 */ public void execute(Tuple input, BasicOutputCollector collector) { String str = input.getString(0); System.out.println("WordCounter 计数器收到单词 "+ str); if(!counters.containsKey(str)){ counters.put(str, 1); }else{ Integer c = counters.get(str) + 1; counters.put(str, c); } } } 5 拓扑流程日志 WordReader 类 发射了一条数据:storm ni great he he xi wang WordReader 类 发射了一条数据:test haha heihei very WordNormalizer 类 收到一条数据,这条数据是: storm ni great he he xi wang WordNormalizer 类 收到一条数据,这条数据是: storm ni great he he xi wang 数据正在被切割,切割出来的单词是 storm WordNormalizer 类 收到一条数据,这条数据是: storm ni great he he xi wang 数据正在被切割,切割出来的单词是 ni WordNormalizer 类 收到一条数据,这条数据是: storm ni great he he xi wang 数据正在被切割,切割出来的单词是 great WordNormalizer 类 收到一条数据,这条数据是: storm ni great he he xi wang 数据正在被切割,切割出来的单词是 he WordNormalizer 类 收到一条数据,这条数据是: storm ni great he he xi wang 数据正在被切割,切割出来的单词是 he WordNormalizer 类 收到一条数据,这条数据是: storm ni great he he xi wang 数据正在被切割,切割出来的单词是 xi WordNormalizer 类 收到一条数据,这条数据是: storm ni great he he xi wang 数据正在被切割,切割出来的单词是 wang WordCounter 计数器收到单词 storm WordCounter 计数器收到单词 ni WordCounter 计数器收到单词 great WordCounter 计数器收到单词 he WordCounter 计数器收到单词 he WordCounter 计数器收到单词 xi WordCounter 计数器收到单词 wang WordNormalizer 类 收到一条数据,这条数据是: test haha heihei very WordNormalizer 类 收到一条数据,这条数据是: test haha heihei very 数据正在被切割,切割出来的单词是 test WordNormalizer 类 收到一条数据,这条数据是: test haha heihei very 数据正在被切割,切割出来的单词是 haha WordNormalizer 类 收到一条数据,这条数据是: test haha heihei very 数据正在被切割,切割出来的单词是 heihei WordNormalizer 类 收到一条数据,这条数据是: test haha heihei very 数据正在被切割,切割出来的单词是 very WordCounter 计数器收到单词 test WordCounter 计数器收到单词 haha WordCounter 计数器收到单词 heihei WordCounter 计数器收到单词 very WordReader 类 发射了一条数据:are mao xiang yi jd WordNormalizer 类 收到一条数据,这条数据是: are mao xiang yi jd WordNormalizer 类 收到一条数据,这条数据是: are mao xiang yi jd 数据正在被切割,切割出来的单词是 are WordNormalizer 类 收到一条数据,这条数据是: are mao xiang yi jd 数据正在被切割,切割出来的单词是 mao WordNormalizer 类 收到一条数据,这条数据是: are mao xiang yi jd 数据正在被切割,切割出来的单词是 xiang WordNormalizer 类 收到一条数据,这条数据是: are mao xiang yi jd 数据正在被切割,切割出来的单词是 yi WordNormalizer 类 收到一条数据,这条数据是: are mao xiang yi jd 数据正在被切割,切割出来的单词是 jd WordCounter 计数器收到单词 are WordCounter 计数器收到单词 mao WordCounter 计数器收到单词 xiang WordCounter 计数器收到单词 yi WordCounter 计数器收到单词 jd -- Word Counter [word-counter-2] -- xi: 1 test: 1 heihei: 1 haha: 1 he: 2 storm: 1 wang: 1 jd: 1 xiang: 1 great: 1 are: 1 ni: 1 yi: 1 very: 1 mao: 1 实现计数器的功能 --完畢! 将一行文本切割成单词,并封装 collector 中发射出去 ---完毕! 以上数据来源于:http://www.maoxiangyi.cn/

下载文档,方便阅读与编辑

文档的实际排版效果,会与网站的显示效果略有不同!!

需要 13 金币 [ 分享文档获得金币 ] 12 人已下载

下载文档

相关文档