Kafka实战:从RDBMS到Hadoop,七步实现实时传输
<p>本文是关于Flume成功应用Kafka的研究案例,深入剖析它是如何将RDBMS实时数据流导入到HDFS的Hive表中。</p> <p>对于那些想要把数据快速摄取到Hadoop中的企业来讲,Kafka是一个很好的选择。Kafka是什么?Kafka是一个分布式、可伸缩、可信赖的消息传递系统,利用发布-订阅模型来集成应用程序/数据流。同时,Kafka还是Hadoop技术堆栈中的关键组件,能够很好地支持实时数据分析或者货币化的物联网数据。</p> <p>本文服务于技术人群。下面就图解Kafka是如何把数据流从RDBMS(关系数据库管理系统)导入Hive,同时借助一个实时分析用例加以说明。作为参考,本文中使用的组件版本分别为Hive 1.2.1,Flume 1.6 以及 Kafka 0.9。</p> <h3><strong>Kafka所在位置:解决方案的整体结构</strong></h3> <p>下图显示了解决方案的整体结构: <strong> <em>Kafka</em> </strong> 和 <strong> <em>Flume</em> </strong> 的结合,再加上Hive的交易功能,RDBMS的交易数据被成功传递到目标 <strong> <em>Hive</em> </strong> 表中。</p> <p><img src="https://simg.open-open.com/show/51badefd01422a252271447921c5a887.png"></p> <h3><strong>七步实现Hadoop实时数据导入</strong></h3> <p>现在让我们深入方案细节,并展示如何在几个步骤内将数据流导入Hadoop。</p> <p>1.从RDBMS中提取数据</p> <p>所有关系型数据库都有一个日志文件,用来记录最新的交易。解决方案的第一步就是获取这些交易数据,同时要确保这些数据格式是可以被Hadoop所接受的。</p> <p>2.设置Kafka生产商</p> <p>发布Kafka话题消息的过程称为“生产商”。“话题”里有各种Kafka所需要维护的信息类别,RDBMS数据也会被转换成Kafka话题。对于这个示例,要求设置一个服务于整个销售团队的数据库,且该数据库中的交易数据均以Kafka话题形式发布。以下步骤都需要设置Kafka 生产商:</p> <pre> $cd /usr/hdp/2.4.0.0-169/kafka $bin/kafka-topics.sh --create --zookeeper sandbox.hortonworks.com:2181 --replication-factor 1 --partitions 1 --topic SalesDBTransactions Created topic "SalesDBTransactions". $bin/kafka-topics.sh --list --zookeeper sandbox.hortonworks.com:2181 SalesDBTransactions</pre> <p>3.设置Hive</p> <p>接下来将创建一个Hive表,准备接收销售团队的数据库交易数据。这个例子中,我们将创建一个用户数据表:</p> <pre> [bedrock@sandbox ~]$ beeline -u jdbc:hive2:<em>// -n hive -p hive 0: jdbc:hive2:<em>//> use raj; create table customers (id <strong>string, name <strong>string, email <strong>string, street_address <strong>string, company <strong>string) partitioned by (time <strong>string) clustered by (id) <strong>into 5 buckets stored <strong>as orc location '/user/bedrock/salescust' TBLPROPERTIES ('transactional'='true');</strong></strong></strong></strong></strong></strong></strong></strong></em></em></pre> <p>为了确保Hive能够有效处理交易数据,以下设置要求在Hive配置中进行:</p> <pre> hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</pre> <p>4.为Kafka到Hive的数据流设置Flume代理</p> <p>现在来看下如何创建一个Flume代理,用于收集Kafka话题资料并向Hive表发送数据。</p> <p>在启用Flume代理前,要通过这几个步骤设置运行环境:</p> <pre> $ pwd /home/bedrock/streamingdemo $ mkdir flume/checkpoint $ mkdir flume/data $ chmod 777 -R flume $ export HIVE_HOME=/usr/hdp/current/hive-server2 $ export HCAT_HOME=/usr/hdp/current/hive-webhcat $ pwd /home/bedrock/streamingdemo/flume $ mkdir logs</pre> <p>再如下所示创建一个log4j属性文件:</p> <pre> [bedrock@sandbox conf]$ vi log4j.properties flume.root.logger=INFO,LOGFILE flume.<strong>log.dir=/home/bedrock/streamingdemo/flume/logs flume.log.file=flume.log</strong></pre> <p>然后为Flume代理配置以下文件:</p> <pre> $ vi flumetohive.conf flumeagent1.sources = source_from_kafka flumeagent1.channels = mem_channel flumeagent1.sinks = hive_sink <em># Define / Configure source flumeagent1.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource flumeagent1.sources.source_from_kafka.zookeeperConnect = sandbox.hortonworks.com:2181 flumeagent1.sources.source_from_kafka.topic = SalesDBTransactions flumeagent1.sources.source_from_kafka.groupID = flume flumeagent1.sources.source_from_kafka.channels = mem_channel flumeagent1.sources.source_from_kafka.interceptors = i1 flumeagent1.sources.source_from_kafka.interceptors.i1.type = timestamp flumeagent1.sources.source_from_kafka.consumer.timeout.ms = 1000 <em># Hive Sink flumeagent1.sinks.hive_sink.type = hive flumeagent1.sinks.hive_sink.hive.metastore = thrift://sandbox.hortonworks.com:9083 flumeagent1.sinks.hive_sink.hive.database = raj flumeagent1.sinks.hive_sink.hive.table = customers flumeagent1.sinks.hive_sink.hive.txnsPerBatchAsk = 2 flumeagent1.sinks.hive_sink.hive.partition = %y-%m-%d-%H-%M flumeagent1.sinks.hive_sink.batchSize = 10 flumeagent1.sinks.hive_sink.serializer = DELIMITED flumeagent1.sinks.hive_sink.serializer.delimiter = , flumeagent1.sinks.hive_sink.serializer.fieldnames = id,name,email,street_address,company <em># Use a channel which buffers events in memory flumeagent1.channels.mem_channel.type = memory flumeagent1.channels.mem_channel.capacity = 10000 flumeagent1.channels.mem_channel.transactionCapacity = 100 <em># Bind the source and sink to the channel flumeagent1.sources.source_from_kafka.channels = mem_channel flumeagent1.sinks.hive_sink.channel = mem_channel</em></em></em></em></pre> <p>5.启用Flume代理</p> <p>通过以下指令启用Flume代理:</p> <pre> $ /usr/hdp/apache-flume-1.6.0/bin/flume-ng agent -n flumeagent1 -f ~/streamingdemo/flume/conf/flumetohive.conf</pre> <p><img src="https://simg.open-open.com/show/30d9ca8e88c0c05327b9c8c6fef46d31.png"></p> <p>6.启用Kafka流</p> <p>作为示例下面是一个模拟交易的消息集,这在实际系统中需要通过源数据库才能生成。例如,以下可能来自Oracle流,在回放被提交到数据库的SQL交易数据,也可能来自GoldenGate。</p> <pre> $ cd /usr/hdp/2.4.0.0-169/kafka $ bin/kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic SalesDBTransactions 1,"Nero Morris","porttitor.interdum@Sedcongue.edu","P.O. Box 871, 5313 Quis Ave","Sodales Company" 2,"Cody Bond","ante.lectus.convallis@antebibendumullamcorper.ca","232-513 Molestie Road","Aenean Eget Magna Incorporated" 3,"Holmes Cannon","a@metusAliquam.edu","P.O. Box 726, 7682 Bibendum Rd.","Velit Cras LLP" 4,"Alexander Lewis","risus@urna.edu","Ap #375-9675 Lacus Av.","Ut Aliquam Iaculis Inc." 5,"Gavin Ortiz","sit.amet@aliquameu.net","Ap #453-1440 Urna. St.","Libero Nec Ltd" 6,"Ralph Fleming","sociis.natoque.penatibus@quismassaMauris.edu","363-6976 Lacus. St.","Quisque Fringilla PC" 7,"Merrill Norton","at.sem@elementum.net","P.O. Box 452, 6951 Egestas. St.","Nec Metus Institute" 8,"Nathaniel Carrillo","eget@massa.co.uk","Ap #438-604 Tellus St.","Blandit Viverra Corporation" 9,"Warren Valenzuela","tempus.scelerisque.lorem@ornare.co.uk","Ap #590-320 Nulla Av.","Ligula Aliquam Erat Incorporated" 10,"Donovan Hill","facilisi@augue.org","979-6729 Donec Road","Turpis In Condimentum Associates" 11,"Kamal Matthews","augue.ut@necleoMorbi.org","Ap #530-8214 Convallis, St.","Tristique Senectus Et Foundation"</pre> <p><img src="https://simg.open-open.com/show/f2fb35069c051fed447193d3865dd930.png"></p> <p>7.接收Hive数据</p> <p>如果上面所有的步骤都完成了,那么现在就可以从Kafka发送数据,可以看到数据流在几秒钟内就会被发送到Hive表。</p> <p> </p> <p>来自:http://www.thebigdata.cn/Hadoop/31071.html</p> <p> </p>
本文由用户 LynHolcomb 自行上传分享,仅供网友学习交流。所有权归原作者,若您的权利被侵害,请联系管理员。
转载本站原创文章,请注明出处,并保留原始链接、图片水印。
本站是一个以用户分享为主的开源技术平台,欢迎各类分享!