Kettle3.2源代码分析

0911lizhiyi

贡献于2011-08-12

字数:14523 关键词: 数据挖掘

Kettle3.2源代码分析 2010年10月 廖佳 jliao422@gmail.com 赵瑞雪 zhao_ruixue@qq.com 1. 概念模型 要了解Kettle的执行分为两个层次:Job和Transformation。两个层次的最主要区别在于数据传递和运行方式。 2. 源代码结构 2.1. 总体结构 Kettle源代码由5个source folder组成,分别是:src-core、src、src-ui、src-db、test。 2.1.1. src-core源代码文件夹 2.1.1.1. org.pentaho.di.compatibility包 系统用到的数值类型及对应接口。 2.1.1.2. org.pentaho.di.core.exception包 异常类。 2.1.1.3. org.pentaho.di.core.xml包 XML相关接口及封装类。这些类涉及 2.1.1.4. org.pentaho.di.core.logging包 Log设置。 2.1.1.5. org.pentaho.di.core.plugins包 组件加载。 2.1.1.6. org.pentaho.di.core.row包 行的数据、元信息、操作。 2.1.1.7. org.pentaho.di.core.gui包 界面接口类和一些基础图形类。 2.1.1.8. 其他 org.pentaho.di.Const类维护系统常量及配置文件地址。 2.1.2. src源代码文件夹 包含调度逻辑和具体的执行代码。最重要的两个包为org.pentaho.di.job和org.pentaho.di.trans。 2.1.2.1. org.pentaho.di.job包 Job的每个执行单元称为entry。 org.pentaho.di.job.entry包中存放了每个entry必须继承的基类以及必须实现的接口。 org.pentaho.di.job.entries包中存放了不同entry的具体实现。 2.1.2.2. org.pentaho.di.trans包 Transfromation中的每个执行步骤称为step。 org.pentaho.di.job.entry包中存放了每个entry必须继承的基类以及必须实现的接口。 org.pentaho.di.job.entries包中存放了不同entry的具体实现。 2.1.2.3. org.pentaho.di.kitchen包 Job的命令行执行器类。 2.1.2.4. org.pentaho.di.pan包 Transformation的命令行执行器类。 2.1.3. src-ui源代码文件夹 最主要的就是其下的org.pentaho.di.ui包。 结构如上图所示,其中最重要的3个子包为:org.pentaho.di.ui.job、org.pentaho.di.ui.trans和org.pentaho.di.ui.spoon。 2.1.3.1. org.pentaho.di.ui.job包 每个entry的参数设置面板类、进程对话类及需要继承的基类。 2.1.3.2. org.pentaho.di.ui.trans包 每个step的参数设置面板类、进程对话类及需要继承的基类。 2.1.3.3. org.pentaho.di.ui.spoon包 整个软件的入口对应org.pentaho.di.ui.spoon包中的Spoon类。 选中Job标签后,红框内的编辑区对象对应org.pentaho.di.ui.spoon.job包中的JobGraph类。 选中转换标签后,红框内的编辑区对象对应org.pentaho.di.ui.spoon.trans包中的TransGraph类。 2.1.3.4. 其他 org.pentaho.di.core.KettleEnvironment类负责初始化Kettle运行环境,主要包括调用org.pentaho.di.core.plugins.PluginRegistry类的init()方法加载组件。 2.1.4. src-db源代码文件夹 org.pentaho.di.core.database下包含各数据库对应的类以及对应的元数据类,还包括不同类型数据库必须继承的基类和必须实现的接口。 2.1.5. test源代码文件夹 测试代码。(未阅读) 3. 数据流图 3.1. 顶层数据流图 3.2. 第一层数据流图 3.3. 第二层数据流图 3.3.1. 作业执行 3.3.2. 转换执行 4. Kettle启动 4.1. 顶层 对应org.pentaho.di.ui.spoon包中Spoon类: public static void main(String[] a) throws KettleException { try { ... //3.2.初始化Kettle运行环境 KettleEnvironment.init(); ... //3.3.初始化Spoon界面, 读取配置文件(.xul) staticSpoon = new Spoon(display); staticSpoon.init(null); ... //开启Spoon,enable各个组件 staticSpoon.start(splash, commandLineOptions); }catch(...){ ... } } 4.2. 初始化Kettle运行环境 对应org.pentaho.di.core包中KettleEnvironment类: public static void init(boolean simpleJndi) throws KettleException { //若未曾初始化 if (initialized==null) { //若不存在则创建一个home文件夹 createKettleHome(); //加载home中kettle.properties文件的内容 EnvUtil.environmentInit(); //Log初始设置:容量大小、超时时长 CentralLogStore.init(); … //加载Kettle需要用到的变量值,从Kettle-variables.xml文件 KettleVariablesList.init(); initialized = true; } } 4.2.1. Home文件夹 根据用户系统设置建立“.kettle”文件夹。 该文件夹内存放用户对于系统的喜好配置以及历史信息。这些信息会在启动时被读取。 4.3. 初始化Spoon界面 读取ui\*.xul文件进行部署。 对应Spoon类的init()方法: public void init(TransMeta ti) { //对界面布局进行设置 shell.setLayout(layout); //加入ktr、kjb文件读写监听器 addFileListener(new TransFileListener()); addFileListener(new JobFileListener()); … //加载初始化一些变量 … try { //SwtXulLoader类没有提供src xulLoader = new SwtXulLoader(); … } catch (…) { … } //加载部分固定组件及对应监听器 … } 源代码无法查看的截图: 这里分析的源代码是最新的4.0版本。3.2版本的这部分代码是可以查看的,但两者结构不同: 上图为4.0版本 上图为3.2版本。 5. Job、Transformation界面调用 通过用户界面操作(点击运行按钮)触发监听器,调用Spoon的runFile()方法。 5.1. Spoon类runFile( ) public void runFile() { executeFile(true, false, false, false, false, null, false); } 5.2. Spoon类executeFile( ) 在这一层决定执行Job还是Transformation。 public void executeFile(boolean local, boolean remote, boolean cluster, boolean preview, boolean debug,Date replayDate, boolean safe) { //获取当前活跃的Transformation元信息 TransMeta transMeta = getActiveTransformation(); if (transMeta != null) executeTransformation(transMeta, local, remote, cluster, preview, debug, replayDate, safe); //获取当前活跃的Job元信息 JobMeta jobMeta = getActiveJob(); if (jobMeta != null) executeJob(jobMeta, local, remote, replayDate, safe); } 5.3. Spoon类getActiveTransformation( )、getActiveJob( ) public TransMeta getActiveTransformation() { EngineMetaInterface meta = getActiveMeta(); if (meta instanceof TransMeta) { return (TransMeta) meta; } return null; } getActiveJob()类的实现同getActiveTransformation()。 如上图,JobMeta和TransMeta都实现了EngineMetaInterface。 上图可见,EngineMetaInterface包含Job、Transformation的整体操作。 5.4. Spoon类getActiveMeta( ) public EngineMetaInterface getActiveMeta() { if (tabfolder == null) return null; TabItem tabItem = tabfolder.getSelected(); if (tabItem == null) return null; //通过当前活跃的Tab标签确定返回类型。 TabMapEntry mapEntry = delegates.tabs.getTab(tabfolder.getSelected()); EngineMetaInterface meta = null; if (mapEntry != null) { if (mapEntry.getObject() instanceof TransGraph) meta = (mapEntry.getObject()).getMeta(); if (mapEntry.getObject() instanceof JobGraph) meta = (mapEntry.getObject()).getMeta(); } return meta; } 下图红色方框内为活跃的Tab。 5.5. 选择执行时序图 下图总结了4.1-4.4节内容,即系统如何判定执行Job还是Transformation。 5.6. Job调用后续步骤 最终转交给JobGraph负责执行。 实现方法同4.7。 5.7. Transformation调用后续步骤 最终转交给TransGraph负责执行。 5.7.1. Spoon类executeTransformation( ) public void executeTransformation(…) { new Thread() { … //利用trans执行代理执行trans delegates.trans.executeTransformation(transMeta, local, remote, cluster, preview, debug, replayDate, safe); … }.start(); } 5.7.2. SpoonTransformationDelegate类executeTransformation( ) public void executeTransformation(…){ … //获取当前活跃的trans TransGraph activeTransGraph = spoon.getActiveTransGraph(); … //将配置设置入executionConfiguration后调用TransGraph实例执行 activeTransGraph.start(executionConfiguration); … } 6. Job执行 6.1. 相关类和接口 6.1.1. JobGraph 维护整个Job编辑区的信息和相应操作。 主要成员变量: private JobMeta jobMeta; 由Job编辑面板动态维护 private Repository rep; private Job parentJob; private JobTracker jobTracker;用于跟踪日志记录 private Date startDate, endDate, currentDate, logDate, depDate; private boolean active, stopped;状态位 private List sourceRows; 返回结果的数据内容 private Result result;每次执行完一个jobentry返回结果 6.1.2. JobMeta 维护整个Job的元数据。 主要成员变量: protected String name; protected String filename; public List jobentries;保存jobentry列表 public List jobhops;保存jobentries之间的链接关系。 public List databases; 6.1.3. JobEntryInterface 每个具体org.pentaho.di.job.entries包下的 entry类需要实现的接口。 包含execute()方法。 6.1.4. Result 每一个jobEntryInterface的实现类在完成相应功能时,返回结果的类型。 主要成员变量: private boolean result;执行是否出现异常 private int exitStatus; 执行结果状态 private List rows;一个jobEntry完成处理后的数据(若存在) private Map resultFiles; 6.1.5. JobEntryCopy 维护每一个不同entry或者相同entry的不同副本的信息 主要成员: private JobEntryInterface entry;具体entry,执行入口 private int nr;副本数,一个编辑区里可以出现多个相同组件 private boolean selected; private Point location;图标位置 private boolean draw; private long id; 6.2. Job执行过程时序图 下图描述了上面各类之间的总体调用关系。 6.3. 代码执行说明 6.3.1. JobGraph类start( ) 该类主要功能是实例化job、开启job的线程。主要代码如下: job = new Job(log, jobMeta.getName(), jobMeta.getFilename(), null); …… job.start(); 6.3.2. JobGraph类start( ) 设置状态位,调用execute方法1,部分代码代码如下: public void run(){ …… stopped=false; finished=false; initialized = true; …… result = execute(false); …… } 6.3.3. Job类execute( )方法1 主要工作是从JobMeta的JobHopMeta找到job入口jobentry信息,根据开始条件调用真正执行jobentry的execute方法2,代码如下所示: startpoint=jobMeta.findJobEntry(JobMeta.STRING_SPECIAL_START, 0, false); // 找到Job开始组件 JobEntrySpecial jes = (JobEntrySpecial) startpoint.getEntry(); // JobEntrySpecial是启动job的job项目 Result res = null; while ( (jes.isRepeat() || isFirst) && !isStopped()){ //符合开始条件时,调用execute方法2 isFirst = false; res = execute(0, null, startpoint, null, Messages.getString("Job.Reason.Started")); } 6.3.4. Job类execute( )方法2 主要功能是根据参数startpoint,提取对应的jobentry,执行对应的jobentry操作,再根据JobMeta的hop信息依次得到下一个jobentry,嵌套调用execute方法2调用,代码如下: JobEntryInterface jei = startpoint.getEntry(); JobEntryInterface cloneJei = (JobEntryInterface)jei.clone(); //以下是执行JobEntryInterface的实现类的execute()方法。 final Result result = cloneJei.execute(prevResult, nr, rep, this); //根据jobMeta的Hop信息找到下一跳的个数 int nrNext = jobMeta.findNrNextJobEntries(startpoint); for (int i=0;i rowsets; private List steps 其中最重要的是rowsets、steps。rowsets保存了所有hop对应的行元数据和数据信息。List steps封装了一个step的主要内容。 7.1.3. TransMeta 描述了整个Trans的元数据信息。 主要的属性成员有: private List steps; private List hops; private String name; private Result previousResult;上一个jobentry的执行结果。 private List resultRows;这次trans执行后的数据结果。 private List resultFiles; resultRows成员将作为result比部分返回多行的元数据和数据(如果有的话)需要返回数据结果时。把resultRows加入Result结果的rows列表,并返回。 7.1.4. StepMetaDataCombi 提取了一个step所需的主要信息。 public class StepMetaDataCombi { public StepMeta stepMeta; public String stepname; public int copy; public StepInterface step; public StepMetaInterface meta; public StepDataInterface data; } 7.1.5. TransHopMeta 描述hop信息。 7.1.6. StepMeta 描述step的公有基本信息(stepid,stepname),对于每一个具体的step,由成员变量StepMetaInterface step来描述。 7.1.7. StepInterface 主要成员函数: processRow()对一行的数据处理。 putRow()把处理后的数据放入下一个step的inputrowsets中。 7.1.8. StepBase 实现了StepInterface是各step具体实现类的基类。完成了公用的处理函数,如putRow(),但是对于更具体的processRow()在StepBase的子类中。StepBase的主要成员有 public ArrayList inputRowSets,outputRowSets; StepBase的子类每次从inputRowSets中取出一行数据,向outputRowSets中写入一行数据。 7.1.9. StepDataInterface 与step相关的数据信息。比如行的元数据信息。StepMetaInterface的实现类是与具体step相关的元数据信息,与StepMeta配合使用,共同描述具体step的元数据信息。 7.1.10. RowSet RowSet类中包含源step,目标step和由源向目标发送的一个rowMeta和一组data。其中data数据是以行为单位的队列(queArray)。一个RowSet作为此源step的outputrowsets的一部分。同时作为目标step的inputRowsets一部分。源Step每次向队列中写一行数据,目标step每次从队列中读取一行数据。 7.1.11. RowMetaAndData public class RowMetaAndData implements Cloneable{ private RowMetaInterface rowMeta;//行的元数据,描述了每行的数据名字,数据类型。 private Object[] data;//数据 } 7.2. 执行过程概述 Trans的执行机制是搭建一个结构,使得每一个step能够从自己的inputRowsets读,处理一行,将结果输出到自己的outputRowsets中。 注意:一个rowset对象既属于前一个step成员outputRowsets的一部分,也属于后一个对象的inputRowsets的一部分。所有的rowset信息都在Trans对象中以List形式维护。 7.3. Trans执行过程时序图 由于trans可以有TransGraph实例化,也可以由JobEntryTrans实例化。但基本过程是一样的,先实例化TransMeta,再实例化Trans,最终调用trans的start方法。 由TransGraph实例化如下图所示: 由JobEntryTrans实例化,如下图所示: 7.4. Trans代码解释 7.4.1. JobEntryTrans类execute( ) 首先获取元数据,然后以此作为参数实例化trans TransMeta transMeta = getTransMeta(rep); …… Trans trans = new Trans(transMeta); …… trans.execute(args); 7.4.2. Trans类execute( ) 具体执行前需要进行准备工作 public void execute(String[] arguments) throws KettleException{ prepareExecution(arguments); startThreads(); } 7.4.3. Trans的prepareExecution() 搭建以下结构结构。 (1)、对每一个step根据hop信息进行找到下一个step或多个step。 (2)、对于每一个this step和nex tstep生成一个RowSet对象,作为缓存供this step写,同时供next step读取数据。 (3)、把此RowSet对象加入到Trans的List成员中保存。 List hopsteps=transMeta.getTransHopSteps(false); 得到step列表 …… 对每一个step进行如下设置 for (int i=0;i nextSteps = transMeta.findNextSteps(thisStep); int nrTargets = nextSteps.size(); for (int n=0;n的相应的rowset加入到step的inputRowSets,和outputRowSets中。 combi.step = step; steps.add(combi); 7.4.4. Trans类startThreads( ) 打开了所有的step线程,核心代码如下: for (int i=0;i信息有两部分: 公有信息:每个entry都具有的信息。如name,description,jobentrytype等,由JobEntryBase保存。 私有信息:每个具体step继承了JobEntryBase并各自特有的信息,如开始entry的xml中其他信息,如下图所示。 每个标签内容对应界面层元素如下图 8.1.3. hops标签 Hops信息主要有:源和目标entry信息;链接的状态,是否开启;链接的类型:条件执行还是无条件执行等。如下图所示: 8.2. .ktr文件格式 一个UniqueRiws.ktr 文件的Transformation顶层有Transformation信息 , 信息。信息组成。 8.2.1. info标签 节点包括了一个Trans的信息。如trans名字,trans描述信息,日志设置信息,设置的最大RowSet数量,修改时间。 对应的界面层设置是: 8.2.2. order标签 对象是Step之间的链接信息,包括一个链接的源Step和目标Step,以及链接的使能状态。 8.2.3. step标签 节点信息描述了一个节点的信息。包括两部分信息: 1、每个step都有的信息:如stepname,step类型信息,所在面板位置信息, 2、具体step特有的信息:如去重是否对重复的行计数,以及计数变量名。 文本输出的格式,分隔符,数据名和数据类型信息。 以下是去重step对应的内容。 对应的GUI设置为: 9. 配置文件 9.1. 语言 位于src-ui中org.pentaho.di.ui.core.database.dialog.messages以及org.pentaho.di.ui.core. dialog.messages中messages_xx_xx.properties文件 包含各窗口文本内容在选择不同语言(中文、英语等)后对应的不同提示语句。 9.2. 图标 9.2.1. 图标位置 位于ui\images文件夹中。 9.2.2. 配置文件 ui\下LAF.properties文件,内容如下: splash_image=ui/images/kettle_splash.png 其中splash_image为程序中用到的property的key值,返回值为ui/images/kettle_splash.png。 9.2.3. 代码说明 1、设置文件: package org.pentaho.di.laf; public class OverlayPropertyHandler implements PropertyHandler { protected static final String propFile = "ui/laf.properties"; …… } 2、使用: final Image kettle_image = ImageUtil.getImageAsResource(display, BasePropertyHandler.getProperty("splash_image")); 9.3. 系统常量 org.pentaho.di.core包中Const类。 10. 存在问题 Kettle隐藏了部分核心代码,会对以后的开发带来影响。

下载文档,方便阅读与编辑

文档的实际排版效果,会与网站的显示效果略有不同!!

需要 20 金币 [ 分享文档获得金币 ]
3 人已下载

下载文档

相关文档