Lexst :大规模数据处理系统

天空的上空

贡献于2012-06-30

字数:0 关键词: 分布式/云计算/大数据

www.lexst.com 1 Lexst:大规模数据处理系统 前言..........................................................................3 第1章 设计概述...............................................................3 1.1 多集群...............................................................3 1.2 弱中心化管理.........................................................4 1.3 以普通硬件为设计标准.................................................4 1.4 动态可伸缩的运行架构.................................................4 1.5 以大规模数据追加/读取为主,支持小规模数据更新........................4 1.6 SQL..................................................................4 1.7 行存储? 列存储?......................................................5 1.8 高性能计算...........................................................5 1.9 扩展接口.............................................................5 1.10 日志................................................................5 第2章 架构...................................................................5 2.1 Top 节点............................................................. 5 2.2 Home 节点............................................................ 6 2.3 Log 节点............................................................. 6 2.4 Data 节点............................................................ 6 2.5 Work 节点............................................................ 6 2.6 Build 节点........................................................... 6 2.7 Call 节点............................................................ 7 2.8 控制台...............................................................7 2.9 全局时间.............................................................7 2.10 元信息..............................................................7 第3章 SQL....................................................................7 3.1 数据库和不可修改性...................................................8 3.2 数据命名空间(data naming space)....................................8 3.3 简单数据库类型(simple database type SDBT)..........................8 3.4 运算符...............................................................9 3.5 SQL 语句............................................................. 9 3.6 分布式计算语句:DC和ADC............................................11 第4章 数据块(data chunk)..................................................11 4.1 基本说明............................................................11 4.2 跳点数(Tops)......................................................11 4.3 数据读取............................................................11 4.4 数据写入............................................................12 4.5 缓存块备份..........................................................12 4.6 主块备份............................................................12 4.7 完整性保证..........................................................12 4.8 元信息..............................................................13 4.10 主块同步...........................................................13 4.11 块数量检查.........................................................13 www.lexst.com 2 4.12 主块冲突...........................................................13 4.13 清理垃圾数据.......................................................14 第5章 网络通信协议..........................................................14 5.1 流通信..............................................................14 5.2 包通信..............................................................14 5.3 稳定的包通信(Keep UDP)............................................14 5.4 远程过程调用(RPC).................................................14 5.5 通信安全............................................................15 5.6 总结................................................................15 第6章 计算模型..............................................................15 6.1 数据分布计算模型:diffuse/aggregate.................................15 6.2 数据聚簇............................................................15 6.3 集群数据聚合模型:marshal/educe.....................................16 第7章 容错..................................................................16 7.1 故障诊断............................................................16 7.2 管理节点故障恢复过程................................................16 7.3 任务节点故障恢复过程................................................16 第8章 负载..................................................................17 8.1 负载检测............................................................17 8.2 影响数据计算的几个问题..............................................17 8.2.1 硬盘..........................................................17 8.2.2 网络带宽......................................................17 8.2.3 移动计算代替移动数据计算......................................17 8.2.4 数据平衡......................................................18 第9章 启动和配置............................................................18 9.1 启动................................................................18 9.2 配置................................................................18 9.2.1 基本配置......................................................18 9.2.2 算法配置......................................................19 第10 章 一个分布式搜索引擎实现...............................................20 10.1 数据库和表定义.....................................................20 10.2 上传数据...........................................................21 10.3 编写 marshal/educe................................................. 22 10.4 编写 diffuse 实现类.................................................26 10.5 编写 aggregate 实现类...............................................29 10.6 搜索请求...........................................................34 10.7 总结...............................................................37 第11 章 未来和发展...........................................................37 11.1 非结构化数据.......................................................37 11.2 计算模型...........................................................38 11.3 更多数据库功能.....................................................38 11.4 多核 CPU 编程.......................................................38 11.5 低功耗硬件.........................................................38 11.6 多版本.............................................................38 www.lexst.com 3 结束语.......................................................................38 前言 在我们的现实世界中,有许多巨大的数据集合。典型如黄页、全球商品名录;电信公司 的通话、短消息记录。更大的如构成搜索引擎的关键字词库、文档仓库。保存这些数据需要 很多的存储空间,同时也要求极快的检索速度。面对这些数据,标准的数据库系统已经不能 胜任。在互联网领域,随着各种类型的数据内容日益多样化,更大存储量,更快计算速度, 更低计算成本的需求也在增加。基于以上原因,我们根据新兴市场的需求,讨论了传统数据 管理产品的特点,分析了许多成功的经验,开发了这套数据管理产品,实现了以下目标: 1. 架构简单,高稳定性和可靠性 2. 可伸缩的运行模型 3. 海量存储能力(PB级) 4. 高效率数据计算 5. 支持 SQL 6. 布署简单,维护成本低 在本文以下部分,我们将介绍这套产品的诸多方面。在文章最后,我们实现了一个分布 式搜索引擎的案例,供大家讨论,欢迎大家提出不同的意见。 第1章 设计概述 Lexst 是一个分布式的大规模数据并行计算/存储系统。在这个基础上,可以搭建包括 电子邮箱、搜索引擎、云计算在内的诸多互联网服务;也可以按照某种业务逻辑,将其建成 一个海量数据计算平台。Lexst 以开放源码形式发布,遵循LGPL 协议,采用Java 语言编码, 在需要加强计算能力和本地系统支持的部分,由 C语言实现。到目前为止,Lexst 的编码、 测试、布署、运行都是在 Linux 平台完成。在产品演进过程中,我们围绕着产品的系统性能 、 可靠性、扩展性、可维护性、兼容、应用、承载能力,进行了长时间的讨论和研究,并最终 确定了产品的主要发展方向。 1.1 多集群 做为一个分布式数据处理平台,Lexst 最显著的特点之一就是支持多个集群跨地域协同 工作。在传统的单集群系统中,中心服务器维护着整个集群的运行,如果一味提高集群中服 务器的数量,网络 IO、中心服务器的承载能力都会受到影响。而采用多集群协同工作方式 后,则能够化解了这个可能成为瓶颈的问题,实现了更大规模的数据处理,这比单集群的处 理能力要提高很多。另一方面,由于跨地域处理允许通过互联网交换数据,为分布在异地的 数据集合协同计算提供了方便。这种设计简化了计算流程,减少了人工参与成分,提高了工 作效率。 www.lexst.com 4 1.2 弱中心化管理 Lexst 将服务器按功能划分为中心服务器和任务服务器。中心服务器是管理整个集群的 核心。与强调中心服务器管理的设计理念不同,Lexst 采用了弱中心化的管理设计。在Lexst 集群中,中心服务器(Top、Home 节点)只承担简单快速的管理任务,任务服务器采用自维持 方式工作,避免与中心服务器进行过多的通信。这样做的好处是:中心服务器的压力减轻, 任务处理速度可以更快,服务器硬件配置也可以相应降低。任务服务器由于自维持的特性, 既使在中心服务器宕机情况下,也能维持一段时间的正常运行,直到再次发起对中心服务器 的请求。而这段时间内,中心服务器可能已经恢复。弱中心化管理模式进一步增强了集群的 稳定性。 1.3 以普通硬件为设计标准 所有运行 Lexst 的硬件都是以普通的个人计算机为标准。这种设计大幅度降低了用户的 硬件投入成本,但是同时,另一个问题随之而来:当一个集群中有数百台乃至上千台这样的 硬件在运行的时候,发生故障的频率将大大增加。所以,故障在 Lexst 设计中是被做为正常 情况处理,而不是异常。集群运行过程中,通过服务器自检和中心服务器追踪等多种方式, 实时发现故障,并将故障点隔离,同时报警通知管理人员处理。 1.4 动态可伸缩的运行架构 Lexst 要求集群中每一台服务器的加入和撤除,都不能影响集群的整体运行。这种结构 特点保证了系统拥有足够的稳定性。加上与容错管理集成,能够为运行环境提供更可靠的安 全保障。 1.5 以大规模数据追加/读取为主,支持小规模数据更新 我们在产品设计之前,参考和分析了许多大规模数据处理的应用场景,发现了一个共同 的现象:大批量的数据添加和查询操作是主要的行为,而删除和修改数据的情况很少发生。 这种需求现象导致了一系列设计的重新定位。首先,为保证大数据量和高响应度,向存储系 统增加数万条记录或者数兆的数据,必须在极短时间内完成。这个时间以秒为单位。查询的 响应要求更高,范围定义在毫秒至百毫秒之间。这包括数据计算和磁盘 IO的响应时间。删 除和修改的时间要求可以放宽,但是也必须做到高效。另一方面,为保证存储的数据能够在 分布式环境下即时生效,也就是存储和查询的时间间隔是 0,我们必须改变原有的分布检索 机制,而这种改变又不能以增加系统压力为代价,这是一项难度颇大的工作。经过不懈努力 , 我们做到了。“即时检索”这项特性意义重大,在很多实时应用场景中都需要使用到这项技 术。 1.6 SQL 兼容 SQL 是设计需求中最重要的目标之一。为实现这一目标,我们在设计中参考和比较 了很多应用案例和技术实现,提出了多种设计方案,并对其中一些方案进行分析验证和改进 。 测试结果表明,SQL 和大规模数据处理并非不可调和,只要应用得当,SQL 诸多理念在大规 模数据计算环境中仍然可以得到很好应用,这对未来产品在 SQL 框架下的延续非常重要。但 是SQL 也有不足之处,目前主要是分布式环境下查询速度缓慢和大数据处理能力有限,对此 www.lexst.com 5 问题,我们在 SQL 基础上做了补充,并取得不错的试验效果。 1.7 行存储? 列存储? 受上述影响的还有磁盘存储格式。面对行存储和列存储的选择,通过多次对比试验后, 我们最终确定采纳行存储做为 Lexst 的物理存储格式。虽然列存储在保证数据读取方面拥有 优势,但是行存储在保证数据高效写入,以及数据有效性、完整性、严谨性方面,是列存储 欠缺的。并且在大量对比试验中我们发现,如果对数据结构进行适当的调整,在大数据读取 和处理上,行存储可以做到和列存储一样高效,甚至超越。这些特性将在第 10 章讲述。 1.8 高性能计算 当前应用环境下,仅仅依靠提升计算机硬件性能已经不能满足大规模数据处理的需要, 优秀的数据计算算法成为高性能计算的关键。在产品研发过程中,我们投入了很大的力量加 强这方面的工作,并完成了多种数据计算算法。这些算法为 Lexst 提供了强有力的支持,保 证了对数据的高效处理能力。算法的具体内容将在第 6章介绍。 1.9 扩展接口 我们尽力保持让用户通过 SQL 操控语句的方式控制集群。这种方式简单快捷,用户也容 易掌握。但是在一些更复杂场景下,比如用户希望实现自己的商业规则,更灵活处理数据, 增加私有保密技术的时候,SQL 操控语句则不能胜任这样的工作,需要使用可扩展接口。我 们在 Lexst 的几个关键位置提供了这样的接口,由用户自己实施应用方案。未来如果有需要 , 仍然会增加这样的接口。 1.10 日志 单机环境下,追踪程序错误可以在代码中设置断点判断,但在集群环境下,这种方式已 经不适用,发现一台机器在运行中产生的错误主要依靠日志记录。所以,日志在这里除了记 录运行信息,更主要起到发现和定位程序错误的作用。在我们开发过程中,日志记录了大量 的错误信息,并为我们快速定位和解决这些问题提供很好的帮助。日志在系统中是不可或缺 的一个组成部分。 第2222章 架构 Lexst 集群由节点组成,按功能划分,代表着不同的处理任务。每个节点实际是操作系 统根用户层的一个任务进程。Lexst 要求每类节点在集群中至少存在一个,否则不能保证集 群正常运行。节点之间通过网络进行通信,网络协议基于标准的 TCP/IP 协议。Lexst 允许 在一台物理计算机上运行多个节点,但不推荐在生产环境这样做。 2.1 Top节点 Top 节点是多集群状态下,管理和监控每个集群 Home 节点的节点。同时,做为最核心 www.lexst.com 6 的节点,Top 还管理着数据库信息、用户账号、数据块 ID的分配。总之,没有 Top 节 点 , 所有节点将停止运行。为保证集群的正常工作,我们通常要求 Top 节点有一个工作节点和两 个备用节点共同运行。备用节点只监控工作节点,同时定期复制工作节点的全部数据。当工 作节点发生故障时,备用节点会发出警告,然后通过网络进行协商,推举一个新节点代替故 障节点并且继续监控 Home 节点,而Home 节点会收到新 Top 节点的通知,重新注册到新 Top 节点下继续运行。 2.2 Home节点 与Top 节点类似,Home 节点是管理和监控所在集群所有节点的节点,是单集群环境中 的核心节点。所不同的是,Home 节点的管理和监控工作更加多样化和复杂。这些工作包括: 追踪各类节点的运行状态、存储各类节点的元信息、判断和隔离故障节点、控制数据块的分 发、保证集群负载均衡等。与 Top 节点一样,Home 节点也要求有一个工作节点和两个备用 节点,它的自管理流程与 Top 节点相同。 2.3 Log节点 Log 节点是日志存储节点,它记录集群所有节点的日志信息(不包括自己)。 各节点将 日志通过网络传送到 Log 节点,LOG 节点根据对方节点类型和 IP地址建立目录,保存到磁 盘上。LOG 节点是所有节点中任务最简单的节点。 2.4 Data节点 Data 节点是数据管理节点,它的任务包括对数据空间的建立、检查、回收,提供数据 的存储、分发、优化、监测、容错等工作。如果有用户定义, Data 节点还承担一部分分布 式计算任务。另外,Data 节点引入了“级别”概念,分为“主节点”(primary node)和“从 节点”(slave node)。主节点负责对数据的添加、查询、删除、修改,从节点负责备份主节 点数据和查询数据的工作。Data 节点是 Lexst 集群中最复杂的节点。 2.5 Work节点 Work 节点是为实现数据的分布计算而专门定制的节点。分布式计算做为 Lexst 设计中 最重要一环,在 Work 节点定义了一套分布算法接口。用户可以按照需要扩展这个接口,实 现自己的分布算法任务。在实际应用中,Work 节点通常与 Data 节点合作,共同完成分布计 算任务。 2.6 Build节点 Build 节点是为重组、整合集群中的零散数据而设计的节点。通常,经过 Build 节点整 合后的数据,计算效率更为高效。可以这么理解,Build 节点是大规模数据计算前的加速器 。 为简化整合过程,Lexst 为Build 节点定义了一套数据整合接口,用户可以在此基础上扩展 自己的应用,实现自己的数据优化。 www.lexst.com 7 2.7 Call节点 Call 节点属于“桥梁”节点。对外,它接受来自控制台或者其它 API 的调用;对内, 它定时收集 Data、Work、Build 节点的运行状态信息,组织 Data 节点和 Work 节点,进行分 布式计算工作。Call 节点通常是做为一个服务进程独立运行,也可以做为一个 WEB 服务器 (如 Tomcat)的子任务,绑定在 WEB 服务器上运行。 2.8 控制台 在Lexst 的规范定义中,控制台不属于节点范畴,它是一个由用户驱动的数据操作终端 , 是Lexst 软件集中的一个组件。所有用户指令都可以通过控制台来完成。为适应不同的操作 系统和用户的使用习惯, Lexst 提供了两种模式的控制台:基于字符界面的 Lexst SQL Console 和基于图形界面的 Lexst SQLive。这两种控制台的操作指令是完全一样的。 2.9 全局时间 全局时间是 Lexst 集群统一标准运行时间,以运行的 Top 节点的时间为标准。Lexst 节 点在启动时,需要向自己的上级节点(Home 节点向 Top 运行节点,其它节点向 Home 运行节 点)查询计算机的当前时间,并修正自己主机的时间。由于网络通信延迟原因,允许存在时 间误差,误差控制在秒级内。全局时间对 Lexst 集群非常重要,许多调用、一致性问题、纠 错都需要用到全局时间。用户在运行前,请设置正确的 Top 节点的系统时间。 2.10 元信息 元信息可以理解为是对实体数据的压缩和抽象表达。在Lexst 定义中,元信息包含两种 类型:数据块元信息和节点元信息。前者是对数据实体的索引信息描述。后者是对节点物理 位置、运行状态、所属信息的定义。 按照节点运行规则规定,下级节点向上级节点报告元信息,上级节点保存下级节点的元 信息,即Top 节点保存 Home 节点的元信息,Home 节点保存 Log、Data、Build、Work、Call 节点的元信息。节点元信息由所在节点提供给上级节点后,会不定时进行修正;元信息的有 效期与所属节点运行期同步。 我们对元信息进行了精心设计,使元信息字节量保持在一个尽量小的状态,这样不会对 存储它的节点的内存构成压力。 第3333章 SQLSQLSQLSQL 兼容 SQL 是Lexst 的特色之一。由于大规模数据处理的许多特殊性,并且SQL 很多功能 也不适合大规模数据处理的需要,我们没有完全支持 SQL,而是有选择地进行了调整,只保 留了 SQL 的基本功能。比如大量复杂的 SQL 语句不再被支持;部分 SQL 功能做了简化处理; SQL 的数据操纵语句在这里继续使用;同时,为适应大数据处理的需要,我们还增加了一些 新的语句以满足任务处理需要。这些改变都是在保证系统的安全、高效和可靠性的前提下进 行的,以避免运行过程中可能发生的错误。 www.lexst.com 8 3.1 数据库和不可修改性 Lexst 延续了 SQL 数据库的基础概念。包括允许在一个账号下建立多个数据库;每个数 据库下允许建立多个表;表由字段构成;索引建立在表基础上,表中一个字段可成为主键, 其他字段可以是从键。 与SQL 规范不同的是,Lexst 规定,为防止因为运行过程中因为修改参数可能产生的二 义性问题,所有数据库、表、索引的声明一旦建立,均不允许修改;并且表和索引需要一起 声明。同时,所有查询必须基于索引,非索引字段不提供查询能力。查询范围仅限于单表, 视图(View)、连接查询、嵌套查询都不再被支持。 不可修改性是保证数据结构安全的基石。 3.2 数据命名空间(data naming space) 数据命名空间又称为 “数据空间(data space)”或“命名空间(naming space)”, 每个数据命名空间对应一个表定义,逻辑上用来定位数据所属区域,是表在数据节点上的物 理实现。数据命名空间是一个简单且重要的设计,通过数据命名空间,计算机可以快速地找 到主机上的数据。 3.3 简单数据库类型(simple database type SDBT) 目前一些主流数据库管理系统,数据类型日益丰富,已达数十种之多。这些数据类型虽 然为使用者提供了更多 选择,但是也造成产品的复杂化,对用户的使用也造成一些困扰。 Lexst 出于简单性考虑,简化了数据类型定义,只保留了最基本的数据类型,而对一些复杂 的数据类型,如变长类型、对象类型,采用原始类型可变长实现。另外,为满足多语言环境 下的需要,字符类型则进一步丰富,分别定义了单字符、双字符、宽字符三种字符类型。 数据类型 标识 位 长 范围 字节 binary (raw) 8 0 - 2G 字符类型 字符 char 8 0 - 2G 双字符 nchar 16 0 - 2G 宽字符 wchar 32 0 - 2G 数值类型 短整型 short (smallint) 16 -32768 - 32767 整型 int 32 -2147483648 - 2147483647 长整型 long (bigint) 64 -9223373036854775808 - 9223373036854775807 浮点 float 32 -3.40E+38 - 3.40E+38 双浮点 double 64 -1.79E+308 - 1.79E+308 日期时间 类型 日期 date 32 1年1月1日 - 9999 年12 月31 日 时间 time 32 0时0分0秒0毫秒 - 23 时59 分59 秒999 毫秒 日期时间 datatime (timestamp) 64 1年1月1日0时0分0秒0毫秒 - 9999 年12 月31 日23 时59 分59 秒 999 毫秒 www.lexst.com 9 3.4 运算符 Lexst 目前支持的 SQL 运算符和处理优先级 3.5 SQL语句 (1)数据控制语句 (2)数据定义语句 运算符类型 运算符 含义 比较运算符 = 等于 > 大于 < 小于 >= 大于等于 <= 小于等于 <> 不等于 逻辑运算符 not 非 and 与 between ... and 在某些数据范围内 in 满足多个条件之一 like 模糊查询,匹配特定符串 or 或 赋符运算符 = 对变量赋值 声明 说明 新语句 CREATE USER 用户名 PASSWORD 密码 [maxsize=] 建立用户账号,最大空 间为选项,默认是无限 制。 否 ALTER USER 用户名 PASSWORD 密码 修改用户账号密码 否 DROP USER 用户名 删除用户账号及其下的 所有数据库定义 否 GRANT 权限名 TO 用户名 对账号进行授权 否 REVOKE 权限名 TO 用户名 回收账号授权 否 GRANT 权限名 ON DATABASE 数据库名 TO 用户名 对账号下的某个数据库 进行授权 否 REVOKE 权限名 ON DATABASE 数据库名 TO 用户名 加收账号下的某个数据 库的授权 否 GRANT 权限名 ON 数据库名.表名 TO 用户名 对账号下某个表进行授 权 否 REVOKE 权限名 ON 数据库名.表名 TO 用户名 回收账号下某个表的授 权 否 www.lexst.com 10 (3)数据操纵语句 (4)数据管理语句 声明 说明 新语句 CREATE DATABASE 数据库名 [CHAR=] [NCHAR=] [WCHAR=] [MAXSIZE=] 建立数据库,指定字符 编码和最大空间 否 DROP DATABASE 数据库名 删除数据库 否 CREATE TABLE [Clusters={number|computer address }][PrimeHost={number}] [HostMode={SHARE|EXCLUSIVE}] [ChunkCopy={number}] [ChunkSize={number}M] 数 据库名.表名 (列名 1 [NULL|NOT NULL][NOT CASE][LIKE|NOT LIKE], 列名2 [NULL|NOT NULL][NOT CASE][LIKE|NOT LIKE].....) 建立表 否 CREATE INDEX 数据库名.表名 (列名 1 [primary],列 名2 [primary] ......) 建立索引 CREATE LAYOUT 数据库名.表名 在磁盘上排列各索引列 的顺序 是 DROP TABLE 数据库名.表名 删除表 否 声明 说明 新语句 SELECT * FROM 数据 库 名 .表名 WHERE 列名 1=值 [AND|OR] 列名 2=值 ... 查询数据 否 INSERT INTO(列名 1,列名 2 ...) VALUES(值1,值 2 ...) 单行记录写入 否 DELETE FROM 数据库名.表名 WHERE 列名 1=值 [AND|OR] 列名 2=值 ... 删除数据 否 UPDATE 数据库名.表名 SET 列名 1=新值 WHERE 列名 1=旧值 修改数据 否 DC|ADC FROM naming:[用户命名实例 ] blocks:[列名 % 整数值] query:"[select 语法]" values:"[用户参 数集]"TO naming:[用户命名实例] sites:[正整数] values:"[用户参数集]" COLLECT naming:[用户命名 实例] show:[数据库名.表名] WRITETO:[本地文件名] 分布式计算的 SQL 语 句 , Call 节点发起,作用到 DATA 节点和 WORK 节点 是 声明 说明 新语句 LOAD INDEX 数据库名.表名 [TO 主机地址] 启动内存索引服务 是 STop INDEX 数据库名.表名 [TO 主机地址] 停止内存索引服务 是 LOAD CHUNK 数据库名.表名 [TO 主机地址] 启动内存数据服务 是 STop CHUNK 数据库名.表名 [TO 主机地址] 停止内存数据服务 是 OPTIMIZE 数据库名.表名 [TO 主机地址] 启动数据优化服务 是 BUILD TASK 任务名 [TO 主机地址] Build 节点按照任务名, 是 www.lexst.com 11 3.6 分布式计算语句:DC和ADC 为了简化用户对分布计算的操作过程,和实现 SQL 环境下的分布计算,我们设计了两组 分布 式 计算 语 句 : DC(distributed computing)和 ADC(asynchronous distributed computing)。从3.5 节对 DC/ADC 的声明看,二者在语法上是完全相同的,区别在于面向的 处理任务和处理流程。DC适合小规模数据、轻量级的调用,是一种纯实时的分布计算过程。 ADC 处理过程则复杂很多,主要是用来处理巨型数据(通常有数百兆或者更多),是一种准 同步(包含异步过程)的分布计算。并且 ADC 完成了一项 DC不具备的功能:平衡分布数据。 我们知道,在分布网络环境中,若不考虑计算机性能和网络延迟带来的影响,一次分布计算 的完成时间取决于数据量最大那个节点的计算时间。如果能够把数据平均分配到各节点,那 么它们的计算完成时间也会趋于一致,这对缩短总计算时间,提高分布计算效率非常重要。 第4章 数据块(datadatadatadata chunkchunkchunkchunk) 数据块是 Lexst 唯一的数据存储单元,是记录的实际载体。数据块以文件的形式布署在 数据节点上,同时被频繁地读写、计算和在网络上传送。数据块这种形式十分适合在分布式 网络环境下的数据存储、分发和维持负载平衡。在本章中,我们将详细介绍数据块和它的相 关内容。 4.1 基本说明 物理结构上,一个数据块由任意多行记录组成,并包含了所有需要的验证信息。以类型 区分,数据块有两种:主块(primary chunk)和从块(slave chunk)。在 Data 主节点上存 储的全部是主块,分发到 Data 从节点后变为从块。每个块由 Top 节点提供的、全集群唯一 的64 位数字标识命名。块的尺寸在用户建表时定义,最大不允许超过 2G。任何时间内,集 群每一个数字标识只有一个主块,从块数量不限。在 Data 主节点上,除最后一个块,其他 块都是“填满”的。最后一个块是特殊状态的块,我们称为“缓存块”(cache chunk)。当 缓存块达到用户定义尺寸时,Data 主节点启动压缩过程,将缓存块转为一个标准的数据块。 4.2 跳点数(Tops) 在Linux 中有一个命令: traceroute,在 Windows 中是 tracert,它们都是按照 ICMP 协议的规范,探测两台主机间的路由距离,这个路由距离,被称为“跳点数”。在Lexst 中, 我们做了相同的技术实现。为探测两个数据节点间的路由距离远近,多处使用了跳点数。 4.3 数据读取 在我们的设计中,数据块以文件形式存储在硬盘上,数据必须进入硬盘才会生效。这种 情况下,对数据块的读写需要在硬盘上进行。以数据查询为例,CPU 首先对待查询数据进行 执行 marshal/educe 服 务 www.lexst.com 12 预计算,结果有两类:没有匹配,退出查询;发现数据,定位到硬盘指定扇区读取数据。在 整个过程中,硬盘的读操作占用了绝大部分时间。这种工作方式十分适合早期的小内存计算 机。但是当频繁调用时,硬盘顺序读写的特点将使并行查询变得缓慢。针对这个问题,我们 引入内存存储方案,将数据块导入内存运行,避开硬盘的限制。在目前内存价格不断下降, 容量不断上升的环境下,这种方案显得更加实用。所以,如果您的一台计算机有足够的内存 空间,希望快速访问数据,不妨采用这个方案,把计算机变成一台“内存数据库”,性能会 有巨大提升。内存存储是一种可选模式,使用“load chunk”语句,将把指定数据空间下的 数据块加载到内存中,“sTop chunk”将从内存中释放这些数据块。 4.4 数据写入 数据写入只发生在主 Data 节点的缓存块上。这是 Data 节点上最频密的操作。Lexst 采 用写入数据前并行计算+串行写入设备的方式,将数据追加到硬盘。这种方式最大限度保证 了数据写入的可靠和高效。 同时,数据写入受到严格控制。当发生磁盘写入失败,缓存块将回滚到故障前的文件。 如果回滚失败,将从备份节点复制缓存块,覆盖这个缓存块。 每一次写入完成,调用前端将得到一次完整的操作成功或失败的信息,成功的信息将用 于启动缓存块的备份。 缓存块的存储量达到用户定义的尺寸时,主节点将启动数据压缩过程,这块数据转为" 主块"。主块产生并且被主节点分发到从节点后,缓存块的意义已经消失,这时主节点将启 动删除操作,清除备份的缓存块。 4.5 缓存块备份 每一次缓存块写入成功,主节点立即以同步方式启动备份操作。在缓存块第一次备份时 , 主节点选择一批备份节点 (备份节点目前固定是 2个,循环选择剩余空间最大的两个从节 点),将缓存块已写入域复制到备份节点,以后将重复这个操入。备份节点只检查数据校验 和,不考虑内容本身。 缓存块备份影响后续的写入操作,同时还占用一定量的网络带宽。但从保证数据安全角 度考虑,这个代价是值得的。 4.6 主块备份 主块备份是保证集群可靠和稳定性的关键。 主块的备份过程是:根据用户在建表时定义的快数量,主节点通过"跳点数"确定所有相 关从节点的路由距离,如果备份数是 3,除主块外,还需备份 2个从块。主节点首先检查对 方的磁盘空间,选择路由距离最远和最近的两个从节点,将块发送出去。如果备份数更多, 将循路由距离由近至远的原则分发主块。主块保存到从节点后,从节点对主块做完整性检查 , 如果校验失败,拒绝接收。确认数据完整,"主块标识"改为"从块标识",加入到运行队列。 到此,一次备份过程完成。 4.7 完整性保证 数据内容的正确性由数据完整性来保证。完整性保证是数据系统管理的基本需求之一。 数据块的完整性包含两部分:行记录完整性保证和块完整性保证。完整性校验算法统一采用 www.lexst.com 13 CRC32。完整性校验运算比较耗时,一般只在数据传输结束或对数据内容本身发生疑问时才 使用。行记录完整性校验码产生在行记录写入磁盘前,块完整性校验码产生在“缓存块”转 为“主块”结束时。Data 节点启动时,将对存储的所有数据块进行一次完整性校验,校验 错误的数据块将被正确的数据块替换。 4.8 元信息 数据块元信息是数据块所属表的索引集合。在Data 节点启动时,将汇集这些索引信息, 提供给 Home 节点。Home 节点再进一步分发给需要的节点,如 Build 和Call 节点。这些节 点根据元信息对数据块进行精准定位,从而实现大规模数据计算。数据块元信息是保证大规 模数据计算的基础之一。 4.9 数据更新 数据更新包括删除(SQL delete)和修改(SQL update)两个操作,发生在主块和缓存 块上。每一次数据更新,都按照以下顺序执行: (1)统计块数目,保证主块存在且唯一,检查有效的从块数。 (2)缓存块和主块被全部锁定(锁定以租约的方式进行,超时后将自动解除锁定)。 (3)检查被更新域的数据完整性(所有待删除行的数据完整性)。 (4)执行删除操作,返回被删除行数据。 (5)解除锁定。 (6)如果是修改操作,数据删除后将以追加的方式将新的数据存储到缓存块。 以上执行工作都将以并行方式进行,前三步是准备工作,任何一步中某个节点发生错误 都将退出执行。数据更新工作相当耗时,并且阻止了部分后续任务的执行,尤其当发生大量 更新的情况下,将引发大范围的数据阻塞现象。所以,Lexst 不赞成大规模的数据更新操作 。 4.10 主块同步 数据更新发生后,随即是主块同步操作。Data 主节点将启动数据同步线程,通知相关 的Data 从节点删除数据。从节点收到消息后,进行数据完整性检查,执行主节点相同的删 除操作。与缓存块备份不同的是,主节点启动的主块同步通常是以并行方式进行,同步期内 从块被锁定。这亦会引起一次短时间的数据阻塞现象。 4.11 块数量检查 Home 节点周期检查集群中的块数量,包括主块和相关的从块。如从块数量不足,将启 动“转从”操作,如主块消失,将启动“转主”操作。任何一个转换操作,都是在 Home 节 点锁定下进行。 4.12 主块冲突 Lexst 规定,任何时间下一个集群只允许有一个主块,超出这个范围即是主块冲突。发 生主块冲突的通常情况是:一台主节点宕机,另一台主节点在 Home 节点命令下,从其它从 节点复制了从块,在本地转为主块加入运行队列。随后宕机主节点恢复。这样,在 Home 节 点的数据块队列中就出现两个主块,冲突产生。 解决这一问题的办法是全局时间,Home 节点判断两个主块的最后更新时间, 以最接近 www.lexst.com 14 当前时间的为保留,另一个将做为垃圾块被 Home 节点要求删除。 4.13 清理垃圾数据 数据块运行一段时间,尤其是有大量数据更新发生后,会在计算机上留下大量垃圾数据 , 这些数据除了占用宝贵的磁盘空间,也影响计算机的运算速度。所以这个时候,最好的办法 是用户使用"optimize"语句重新整理数据,回收被占用的磁盘空间。 除此之外,optimize 也可以制成脚本交由系统定期运行。本文后续还有对 optimize 的介绍。 第5章 网络通信协议 Lexst 节点间的调度管理和任务分配都是 通过网络来完成。安全稳定的网络通信是 Lexst 集群正常运行的基本保证。在这套产品中,我们采用 FIXP 协议做为 Lexst 集群的基 础通信协议。FIXP 协议全称是:Free Information eXchange Protocol(自由信息交换协 议),是一套二进制的通信协议,基于 TCP/IP 通信协议,已发展多年,最早应用于互联网的 P2P 交换网络,目前已经衍生了多个版本。现用版本是最新版本的简化版,取消了多处不需 要的定义,只保留了最主要的通信部分。 5.1 流通信 "流通信"建立在 TCP 通信基础之上,主要针对持序的、大流量的数据传输。如数据块的 分发。流通信会占用大量的网络带宽,可以想象,当有数十乃至上百个 Data 节点同时进行 数据块传输时,这种流量足以造成网络阻塞,严重影响集群的正常通信。所以,大流量的数 据传输在 Lexst 是受到限制的,必须在 Home 节点监管下进行。 5.2 包通信 "包通信"建立在 UDP 通信基础之上,主要应用于非持序、非可靠的小流量数据通信。数 据包的尺寸通常在 20 至500 字节之间(IP 包尺寸限制内),以管理包、控制包为主,心跳包 是最常用一种。包通信是 Lexst 中最频密的一种通信方式。 5.3 稳定的包通信(Keep UDP) UDP 的优势是计算机资源占用率低,缺点是数据通信不稳定,存在丢包现象。TCP 恰恰 相反,可以提供稳定的数据通信,但是资源占用率高。如何结合二者的优点而避免其缺点呢? 这个方案的结果就是“稳定的包通信(Keep UDP)”。“稳定的包通信”建立在 UDP 基础之上, 采取模拟 TCP 通信过程的方式,为 UDP 数据提供稳定的通信保证。“稳定的包通信”应用于 通信密度不高,且有一定数据量,需要保持可靠通信过程的领域,如网络日志包的传递。 5.4 远程过程调用(RPC) RPC 是一种历史悠久且非常优秀的网络通信协议,至今仍被广泛使用。它采取隐藏网络 两端通信过程的方式,使不同计算机之间的调用象本地 API 一样方便。 www.lexst.com 15 FIXP 协议也包含对 RPC 的实现,它的通信建立在 TCP、Keep UDP 两种通信类型基础之 上,通过在本地嵌入接口和简化数据通信,实现 RPC 的处理过程。目前节点间许多复杂的、 安全度高的通信都采用 FIXP RPC 进行。 5.5 通信安全 随着网络安全风险日益严重,安全的网络通信变得越发重要。当Lexst 多个集群之间需 要跨越互联网协同工作时,安全的通信保证已经不能忽视。 我们定义了两种安全通信类型:地址验证(检查客户 IP地址是否在服务器规定的可信地 址范围内)和密文验证。其中密文验证算法包括:MD5、SHA1、DES3、RSA。用户可使用其中任 意一种。数据安全通信是一个可选操作,FIXP 服务器默认通信状态是不需要验证。推荐用 户在需要经过互联网通信时,采用密文验证;单个集群内(可以保证安全的内部网络)的通信 , 采用地址验证以减轻服务器压力,或者不验证。 5.6 总结 以上四种通信类型针对不同的需求而设计,并且经过长期测试,现在已经稳定运行,完 全满足了我们对不同任务的需要。在保证安全的情况下,为减轻系统压力、降低资源消耗和 节约网络流量起了关键的作用。 第6章 计算模型 6.1 数据分布计算模型:diffuse/aggregate diffuse/aggregate 是我们设计的一个分布式调度算法,最早应用于关系数据库系统的 分布式数据处理环境中,现在被原封不动迁移到 Lexst。新的分布计算语句 DC和ADC 的理 论基础即来自于 diffuse/aggregate。 diffuse/aggregate 的工作分为两步,首先是数据分区,diffuse 取得数据集合后,按 照用户定义的规则,对数据进行计算或重新排列(如果存储时已做好,可忽略这一步)。完成 数据分区工作后,数据将分发给 aggregate 模块,aggregate 模块依然遵守用户计算规则和 定义,再次对数据做合并或者其它计算,处理结果返回给 diffuse 模块,diffuse 如果有处 理需要,则做进一步处理,没有就发送给调用前端。 物理布局上,diffuse/aggregate 是独立的两个模块。通常 diffuse 位于 Data 节点, aggregate 位于 Work 节点。 6.2 数据聚簇 数据聚簇是对 Data 主节点上的数据做的一个重新排列的过程。排列依据用户建表时的 “create layout”语句中的列声明进行排列(没定义默认按主键以升序排列)。“optimize” 语句执行这个调用。数据聚簇执行中,会产生大量内排序和数据调整,这个过程会比较消耗 时间和内存。数据聚簇结果是将大量零散的相同的数据布署到一起,以达到减少磁盘读取次 数和缩短读取时间的目的。 www.lexst.com 16 6.3 集群数据聚合模型:marshal/educe 如果说数据聚簇是针对单机环境下的数据优化,那么marshal/educe 就是针对集群环境 的数据优化。marshal/educe 操作在 Build 节点进行,由两个环节组成。marshal 是一个负 责对数据的重新组织 ,按照表的主键进行排序; educe 负责将排序后的数据读出。通常, marshal 过程在内存中执行,速度会很快;educe 是磁盘操作,一次将相同主键的数据全部 读出,由于磁盘读取的原因,执行速度会有所降低。相对数据聚簇,marshal/educe 提供了 扩展接口,用户拥有更自由的操作空间,可以在此基础上编写代码优化数据集合,或者生成 其它格式的新数据。关于 marshal/educe 的具体使用将在第 9章说明。 第7章 容错 7.1 故障诊断 容错是 Lexst 主要设计之一。Lexst 通过两部分进行容错处理:故障自检和管理节点监 控。故障自检由故障诊断模块完成。故障诊断模块是一个后台处理程序,与宿主节点绑定在 一起运行。检查范围包括:硬盘、内存、主板、网卡、网络接口。它实时分析和发现计算机 上的错误,结果报告给宿主节点。宿主节点收到故障消息后,采用对外报警、通知管理节点 、 自我屏蔽的方式处理故障。另一方面,Home 节点做为集群的管理者,实时监控着集群中的 所有节点。当收到故障节点的通知,或者发现节点通信失败后,将以隔离的方式屏蔽故障节 点,并通知相关节点取消与此故障节点通信。 目前已经完成的故障诊断子模块包括:硬盘、内存、网络接口,其它子模块的功能待增 加中。容错处理是一项复杂且需要严谨对应的工作,目前容错仍处于调试过程中。当前版本 暂不提供故障诊断服务。 7.2 管理节点故障恢复过程 管理节点包括 Top 节点和 Home 节点,它们通常都有后备节点(没有后备节点我也无能为 力),备份节点保存运行节点的全部元信息。当工作节点通知后备节点故障停止或后备节点 发现工作点宕机失效后,迅速联系相邻管理节点和任务节点,检查工作节点是否正常,如果 失败,管理节点协商,选择一个节点代替故障点,并象所有工作节点发送重注册的通知包, 要求重新注册。 管理节点切换过程很快,通常在数秒内完成,然后就是等待任务节点重新联接,完全接 替故障点工作。 7.3 任务节点故障恢复过程 任务节点在工作过程中,都是处于自维持的状态,除了定时向 Home 节点发送心跳包和 必须的请求,一般不与 Home 节点通信。这种工作模式要求各工作节点通过自我检查软硬件 工作状态来保证可靠运行,目前,通过标准化的诊断机制,可以发现大部分错误。错误发生 后,故障点首先通知 Home 节点,然后以自我屏蔽的方式结束运行。如果工作节点不能实现 自我感知而发生宕机,这个过程由 Home 节点来完成。故障解决后,任务重启需要手工完成。 www.lexst.com 17 第8章 负载 8.1 负载检测 负载检测是 Lexst 另一个主要功能。它保证计算机在可承受的范围内运行而不至于超 载 。Lexst 的负载检测是通过标准化的负载检测模块来完成。负载检测包括:硬盘、内存、CPU、 网络 IO。衡量负载的依据是负载因子。负载因子是一个指标数据,会根据计算机硬件不同 而调整。当超载现象发生时(允许短时间的超载),负载检测模块将向宿主节点报告。宿主 节点把此消息通知 Home 节点。Home 节点根据节点的类型,负责分散压力,调整端到端之间 的任务分配,从而达到平衡集群负载的目的。 由于负载检测模块是与容错诊断模块集成在一起运行的,目前因为容错模块的原因,当 前版本也暂不能提供此项服务。 8.2 影响数据计算的几个问题 8.2.18.2.18.2.18.2.1 硬盘 目前主流硬盘仍然是以机械硬盘为主。结构上,一块硬盘主要由盘片、磁头、电机组成 。 硬盘工作时,电机驱动盘片高速旋转,磁头对盘片进行读写。硬盘的这种机械运动模式决定 了它不可能象内存一样有更高的存取速度提升空间。影响硬盘读写性能的主要有三个因素: 传输速率、缓存、读写过程。前两项在硬盘出厂时已经确定,不会改变;后一项在硬盘运行 过程中产生,通常读的数据量远远高于写数据量,顺序读写的性能又高于随机读写。 为提高硬盘的读写性能,Lexst 按照硬盘的这种特性进行了优化。但是由于硬盘的物理 性质,这种优化对提升硬盘的读写性能仍然有限。从我们目前的分布计算试验结果来看,当 进行大数据的分布计算时,硬盘的数据读取时间会占用时间总量的 20%至40%。所以,如果 希望实现更高效的数据读写,采用新型硬盘,如 SSD 硬盘是一个不错的选择。 8.2.28.2.28.2.28.2.2 网络带宽 网络带宽是另一个影响分布计算的主要问题,对数据传输效率影响巨大,即使在局域网 中也是如此。我们曾做过一个测试,使用 80 台服务器持续分发数据块,检查会给网络带来 什么影响。当我们在千兆全双工以太网络环境下的进行试验时,基本可以实现数据块无延时 分发。网络带宽改为百兆时,数据延时大大增加。我们没有试验十兆网络,估计结果会慢到 不能容忍的地步!所以,为了保证系统能够快速运行,请选择足够的网络带宽。 8.2.38.2.38.2.38.2.3 移动计算代替移动数据计算 在网络环境下,一台计算机将一个大的数据集合经过计算后转换为一个小的数据集合, 并将计算结果发送给调用方,这种数据处理方式被称为移动计算。移动数据计算则是数据原 封不动地发送给调用方,由调用方完成计算过程。两者对比,移动计算对降低网络负载、提 高计算效率方面明显优于移动数据计算。所以,在软件产品的设计和开发过程中,我们应该 www.lexst.com 18 更多考虑采用移动计算而不是移动数据计算处理数据。 8.2.48.2.48.2.48.2.4 数据平衡 分布计算环境中的数据量平衡是一个重要且未被充分关注的隐性瓶颈问题。如果被计算 的数据量不多,这个问题并不明显示,也不会引起特别关注。只有当数据量达到一定额度, 这个问题才会显现且变得十分重要。针对这个问题,我们目前已经有了解决方案,答案就是 ADC。如果你有大型数据需要处理,请用它来完成。 第9999章 启动和配置 9.1 启动 对任何一位没有使用过 Lexst 的普通用户来说,运行Lexst 都是一件非常轻松的事。首 先,需要从 www.lexst.com 网站下载软件包,用“tar -zxvf lexst_app_0.10.tar.gz”命 令解压,所有节点文件将解压到硬盘上。在每个节点的 bin 目录下,都有三个 sh 文件: node.sh、run.sh、stop.sh。用 vi 或者其它文本编辑器打开 node.sh,找到“JAVA_HOME”, 将“=”号后的 java 根目录改为当前实际路径,保存并退出 node.sh。JRE 版本要求是 1.6 或者以上版本。然后以根用户(root)身份在 bin 目录下键入“./run.sh”,一个节点将进 入后台运行。键入“./stop.sh”,节点任务则退出。Lexst 正常启动顺序是先启动“Top、Home、 Log”三个节点,退出过程相反。其它节点的启动和退出无先后顺序。 9.2 配置 9.2.19.2.19.2.19.2.1 基本配置 在所有节点的 bin 目录下,都有一个 local.xml 文件。这是节点的基本配置文件。每个 文件内都有五段相同的配置: 192.168.1.100 8000 8000 localhost 5222 5222 www.lexst.com 19 192.168.1.200 192.168.1.201 192.168.1.202 debug yes none /lexst/log/data 20 10 15 第1段是上级节点的通信地址,在这里上级节点是 Home 节点,只有 Top 节点没有上级 地址。第 2段是当前节点的服务器监听地址,如果本地 IP地址是 “localhost”或者是 “127.0.0.1”,节点在启动时将绑定一个实际 IP地址。第 3段是安全通信文件路径,FIXP 服务器在启动时将加载这个文件,按照文件中的定义选择安全通信模式。如果运行节点收到 来自其它节点的关闭指令,它将检查发出通知的节点是不是在第 4段的 IP地址范围内,如 果在这个范围内,运行节点将停止服务并退出运行状态。第5段是日志配置,各参数说明见 下表。 9.2.29.2.29.2.29.2.2 算法配置 另一个重要配置是数据分布算法和聚合算法所属任务类的配置。在 Data、Work、Build 节点的 bin 目录下,都有一个 task.xml 文件,其中 Data 节点被分配处理 diffuse 算 法 ,Work 关键字 说明 level日志级别,顺序依次是:debug,info,warning,error,fatal console-print是否在终端上打印日志信息,关键字是“yes”或者“no” send-mode日志发送模式,分别是: none(不发送 )、file(写入本地文件 )、 server(发送到日志服务器) directory本地日志存储目录,如果 send-mode 是file,directory 生效,日志 数据将写入这个目录下 filesize日志文件最大尺寸,如果send-mode 是file,filesize 生效,单位是 兆(M) buffer-size日志数据在内存中的最大缓存空间,单位是 K send-interval日志发送间隔,如果send-mode 是file 或者 server,send-interval 生效,日志数据将写入磁盘或者发送到日志服务器。单位是秒。 www.lexst.com 20 节点处理 aggregate 算法,Build 节点负责 marshal/educe 算法。虽然它们针对的任务各不 相同,但是配置格式是一致的。格式如下: org.search.SearchProject diffuse-sample org.search.SearchTask project-class 是所属任务信息的类文件,这个类文件保存“naming、task-class、resource” 三个信息,需要从 com.lexst.util.naming.Project 类派生。naming 是所属任务命名,命 名由用户任意定义,忽略大小写,但是必须保证是集群唯一,否则会引起混乱。task-class 是执行任务的类对象名称,类对象将分别从“com.lexst.algorithm.diffuse.DiffuseTask、 com.lexst.algorithm.aggregate.AggregateTask 、 com.lexst.algorithm.marshaleduce.BuildTask”三个类中派生。 resource 保存用户自定 义的任意信息,数据内容由 project-class 指定的类进行解析。 第10章 一个分布式搜索引擎实现 这是一个分布式搜索的实验案例,在这里,我们将展示一个搜索引擎的完整实现。实验 由两部分组成,硬件环境包括: 由17 台个人计算机组成的一个集群,每台机器主要配置如下: 操作系统:Fedora Linux 5.0 Java 运行库:JRE 1.6 CPU: Pentium III 1.0G - Pentium IV 1.7G 硬盘:40G - 500G 内存:256M - 2G 网络连接:100M 其中 Top、Home、Log 共用一台服务器,Data 节点服务器 9台(三台主节点,六台从节 点 ),Work 节点 3台,Build 节点 3台,Call 节点 1台。各节点已布署对应的软件模块,并 且已经启动和运行了它们。 编码实现上,我们需要搜索集群中的全部数据。被搜索的数据分为两部分,一类是“即 时搜索”,即针对那些进入集群存储但没有形成“chunk”的数据,这类数据的数量相对要小。 另一类是已经形成“chunk”且数量很大的数据,为了更快地搜索这类数据,我们将它们转 换为计算机能够快速处理的格式并存储它们,以实现在小集群处理大数据的目的。 数据来源方面,我们假定您已经拥有了一批文档,并在这些文档基础上,实现了分词、 词频统计、词权重分析定义,词定义有三个元素: word (nchar 类型)词条。 documentId ( int 类型)词条所属文档 ID,全局唯一。 weight (short 类型)词权重,数值越大权重越高。 www.lexst.com 21 10.1 数据库和表定义 我们需要定义一个数据库和其下的三个表。数据库名称为“engine”,这个数据库的字 符全部采取标准的 UTF 编码。三个表分别保存词条、压缩的词条集合和文档数据,表名称分 别是“words、zipwords、documents”。“words”表针对上述的“即时搜索”,“zipwords” 表针对大数据搜索。 (1) 数据库定义 (2) 词库表定义 (3) 压缩词库表定义 (4) 文档库表定义 启动 Lexst SQLive(图形控制台)或者 Lexst SQL Console(字符终端控制台),以管 理员或者注册用户身份登录,将以上定义录入,Lexst 将完成在集群中建立数据库和表,并 为每个表分配命名空间的工作。 10.2 上传数据 在Lexst 集群中,Data 节点负责数据管理工作,但是它不接受对外的服务,所有 数据 处理必须通过 Call 节点完成。所以,我们只需要针对 Call 节点编写代码即可。上传包括两 部分:上传词条和上传文档。具体代码如下: (1)上传词条 create database engine char=UTF8 nchar=UTF16 wchar=UTF32 create table engine.words clusters=1 PrimeHost=3 HostMode=Share ChunkCopy=3 ChunkSize=64M (word nchar, documentId int, weight short) create index engine.words (word primary) create layout engine.words (word asc) create table engine.zipwords Clusters = 1 PrimeHost=3 HostMode=Share ChunkCopy=3 ChunkSize=64M (word nchar , hash int , data raw ) create index engine.zipwords ( word primary ) create layout engine.zipwords ( word asc) create table engine.documents Clusters=1 PrimeHost=3 HostMode=Share ChunkCopy=3 ChunkSize=64M ( documentId int , title nchar , content nchar) create index engine.documents ( document primary) create layout engine.documents ( documentId asc) www.lexst.com 22 函数说明: 1. WordItem 是一个类定义,包含三个参数:词条(word),文档 ID(documentId),关键 字权重(weight) 2. table 保存 engine.words 的属性信息 3. ip, port 分别是 Call 节点的 TCP 地址和端口 4. CallClient 负责连接到 Call 节点,将Inject 中的数据批量上传到 Call 节点,Call 节点负责将数据存储到 Data 节点,并返回存储的条目数。 (2)上传文档 上传文档的操作过程与上传词条的过程相同,在此我们就不再重复了。各位可以尝试编 写这段代码。 10.3 编写marshal/educe engine.zipwords 是一个保存相同词条合并结果的表,数据来 源于 engine.words 表, publicpublicpublicpublicvoidvoidvoidvoiduploadWord(Listarray,Tabletable,String ip,intintintintport)throwsthrowsthrowsthrowsIOException{ Injectinject=newnewnewnewInject(table); UTF16utf16=newnewnewnewUTF16(); FieldwordField=table.find("word"); FielddocIdField=table.find("documentId"); FieldweightField=table.find("weight"); forforforfor(WordItemitem:array){ Rowrow=newnewnewnewRow(); bytebytebytebyte[]b=utf16.encode(item.getWord()); row.add(newnewnewnewNChar(wordField.getColumnId(),b)); row.add(newnewnewnew Int(docIdField.getColumnId(), item.getDocumentId())); row.add(newnewnewnew Small(weightField.getColumnId(), item.getWeight())); inject.add(row); } CallClientclient=newnewnewnewCallClient(truetruetruetrue); SocketHostremote=newnewnewnewSocketHost(SocketHost.TCP,ip,port); client.connect(remote); intintintintitems=client.inject(inject,falsefalsefalsefalse); System.out.printf("insertcount:%d\n",items); client.close(); } www.lexst.com 23 zipwords 表有三个字段:word、hash、data。word 与engine.words 表的 word 对应。hash 是documentId%module 的结果值(模值必须固定)。data 是 字 节 数 组 ,保存一 组“documentId + weight”的集合, 在转成为字节数组前, 按照权重字段( weight)做降序排序。在进行 marshal/educe 计算前,我们需要从 com.lexst.build.task.BuildTask 类派生实现子类并 布署在Build 节点下。Build 节点收到“build task”指令后,启动并运行它们。这个子类 的工作是从各 Data 节点取来 engine.words 表的数据并加载它们,以实现在本地快速计算。 这个类的主要代码包括三部分: (1) marshal 实现 marshal 由Install(jni 接口)类处理,它的工作是将相同的词条整理归类。 完成后返 回两个 long 型,values[0]是所处理数据集的字节总量,values[1]是处理的记录总数。 (2) educe 实现 educe 负责提取 marshal 整理的数据,这是一个读磁盘的过程。educe 将相同词条记录 一次性取出。 (3) compress 函数实现 compress 函数是对 educe 取出的 engine.words 表数据进行解析,记录转换为Row 类, 再从 Row 类中取出各列数据,在内存中将相同词条合并,转换为 engine.zipwords 表需要的 数据,最后写入磁盘,成为 engine.zipwords 的记录。请注意 WordSet 类中的"int hash = documentId % HASH_SIZE;",通过这行代码,我们将同名关键字实现了分组,hash 范围是 0 至HASH_SIZE-1,为数据分布做预处理。hash做为分割值存入engine.zipwords表 。HASH_SIZE 是一个固定正整数值。在后面的 diffuse 还有相同的操作。 bytebytebytebyte[]db="engine".getBytes(); bytebytebytebyte[]words_table="words".getBytes(); longlonglonglong[]values=Install.marshal(db,words_table); whilewhilewhilewhile(truetruetruetrue){ bytebytebytebyte[]data=Install.educe(db,words_table,10485760); ifififif(data==nullnullnullnull)breakbreakbreakbreak; compress(zipwords_table,data); } privateprivateprivateprivatevoidvoidvoidvoidcompress(Tablezipwords_table,bytebytebytebyte[]data){ Mapmap=newnewnewnewTreeMap(); UTF16utf16=newnewnewnewUTF16(); forforforfor(intintintintoff=0;offmapSet=newnewnewnewTreeMap(); www.lexst.com 25 ItemSet 类 Item 类 publicpublicpublicpublicvoidvoidvoidvoidadd(intintintintdocumentId,shortshortshortshortweight){ intintintinthash=documentId%HASH_SIZE; ItemSetset=mapSet.get(hash); ifififif(set==nullnullnullnull){ set=newnewnewnewItemSet(); mapSet.put(hash,set); } set.add(documentId,weight); } publicpublicpublicpublicSetkeySet(){ returnreturnreturnreturnmapSet.keySet(); } publicpublicpublicpublicItemSetget(intintintinthash){ returnreturnreturnreturnmapSet.get(hash); } } classclassclassclassItemSet{ ArrayListarray=newnewnewnewArrayList(); publicpublicpublicpublicvoidvoidvoidvoidadd(intintintintdocumentId,shortshortshortshortweight){ array.add(newnewnewnewItem(documentId,weight)); } publicpublicpublicpublicvoidvoidvoidvoidsort(){ java.util.Collections.sort(array); } publicpublicpublicpublicbytebytebytebyte[]build(){ ByteArrayOutputStreamout=newnewnewnewByteArrayOutputStream(); forforforfor(Itemblock:array){ bytebytebytebyte[]b=Numeric.toBytes(block.doucmentId); out.write(b,0,b.length); b=Numeric.toBytes(block.weight); out.write(b,0,b.length); } returnreturnreturnreturnout.toByteArray(); } } www.lexst.com 26 通过 marshal/educe 处 理 ,我们将数据存入了 engine.zipwords 表 。这样 ,engine.words 表将用于少量的最新数据的“即时搜索”。engine.zipwords 表搜索已经缩压的数据。二者 的数据结果将在 Work 节点共同处理。 10.4 编写diffuse实现类 实现 diffuse 算法之一,是从com.lexst.algorithm.diffuse.DCTask 类派生子类并布 署到 Data 节点下。在这里,我们分别为“engine.words”和“engine.zipwords”两个表实 现两个子类,为数据分发到 Work 节点做准备。DCTask 类的抽象函数“execute”负责计算 数据并完成数据分组工作,数据分发由 Data 节点执行。 (1)engine.words 表的 execute 函数 @Override publicpublicpublicpublicDiffuseResult[]execute(DCdc,Tablewords_table,bytebytebytebyte[] data){ java.util.Listlist=dc.listToAddress(); intintintintsize=list.size(); Mapmap=newnewnewnewTreeMap(); UTF16utf16=newnewnewnewUTF16(); forforforfor(intintintintoff=0;off{ intintintintdoucmentId;shortshortshortshortweight; publicpublicpublicpublicItem(intintintintdocumentId,shortshortshortshortweight){ thisthisthisthis.doucmentId=documentId; thisthisthisthis.weight=weight; } publicpublicpublicpublicvoidvoidvoidvoidadd(shortshortshortshortweight){ thisthisthisthis.weight+=weight; } @Override publicpublicpublicpublicintintintintcompareTo(Itemarg){ ifififif(weightarg.weight)returnreturnreturnreturn-1; returnreturnreturnreturn0; } } www.lexst.com 27 WordResult 类 intintintintlen=row.resolve(words_table,data,off); off+=len; bytebytebytebyte[]b=((NChar)row.get((shortshortshortshort)1)).getValue(); Stringword=utf16.decode(b); intintintintdocumentId=((Int)row.get((shortshortshortshort)2)).getValue(); shortshortshortshortweight=((Small)row.get((shortshortshortshort)3)).getValue(); intintintinthash=documentId%HASH_SIZE; intintintintmod=hash%size; Groupgroup=map.get(word); ifififif(group==nullnullnullnull){ group=newnewnewnewGroup(); map.put(word,group); } group.add(mod,documentId,weight); } WordResult[]results=newnewnewnewWordResult[size]; forforforfor(intintintinti=0;imap=newnewnewnewTreeMap(); publicpublicpublicpublicvoidvoidvoidvoidadd(intintintintmod,intintintintdocumentId,shortshortshortshortweight){ ByteArrayOutputStreambuff=map.get(mod); ifififif(buff==nullnullnullnull){ buff=newnewnewnewByteArrayOutputStream(); map.put(mod,buff); } bytebytebytebyte[]b=Numeric.toBytes(documentId); buff.write(b,0,b.length); b=Numeric.toBytes(weight); buff.write(b,0,b.length); } publicpublicpublicpublicSetkeySet(){ returnreturnreturnreturnmap.keySet(); } publicpublicpublicpublicbytebytebytebyte[]get(intintintintmod){ ByteArrayOutputStreambuff=map.get(mod); returnreturnreturnreturnbuff.toByteArray(); www.lexst.com 29 (2)engine.zipwords 表的 DiffuseTask 子类 10.5 编写aggregate实现类 aggregate 实现与 diffuse 类似,可从com.lexst.algorithm.aggregate.DCTask 类派 } } @Override ppppublicublicublicublic DiffuseResult[] execute(DC dc, Table zipwords_table, bytebytebytebyte[] data) { java.util.List list = dc.listToAddress(); intintintint size = list.size(); WordResult[] results = newnewnewnew WordResult[size]; forforforfor (intintintint i = 0; i < size; i++) { results[i] = newnewnewnew WordResult(list.get(i)); } UTF16 utf16 = newnewnewnew UTF16(); forforforfor (intintintint off = 0; off < data.length;){ Row row = newnewnewnew Row(); intintintint len = row.resolve(zipwords_table, data, off); off += len; NChar nchar = (NChar) row.get((shortshortshortshort) 1); bytebytebytebyte[] words = utf16.decode(nchar.getValue()).getBytes(); intintintint hash = ((Int) row.get((shortshortshortshort) 2)).getValue(); intintintint mod = hash %HASH_SIZE; bytebytebytebyte[] raws = ((Raw) row.get((shortshortshortshort) 3)).getValue(); bytebytebytebyte[] word_size = Numeric.toBytes(words.length); bytebytebytebyte[] raws_size = Numeric.toBytes(raws.length); results[mod].add(word_size, 0, word_size.length); results[mod].add(words, 0, words.length); results[mod].add(raws_size, 0, raws_size.length); results[mod].add(raws, 0, raws.length); } returnreturnreturnreturn results; } www.lexst.com 30 生子类并布署在 Work 节点下。DCTask 子类要处理两个抽象函数:“inject”函数保存来自 diffuse 算法提供的数 据 ,“execute”函数汇总并计算这些数据,结果以分组形式发送回各 Data 节点,最终完成一次分布计算过程。出于简化程序考虑,“execute”函数在这里只处 理词条全匹配一种情况。 DTask 派生类:WordTask importimportimportimportjava.io.ByteArrayOutputStream; importimportimportimportjava.io.IOException; importimportimportimportjava.util.ArrayList; importimportimportimportjava.util.HashMap; importimportimportimportjava.util.List; importimportimportimportjava.util.Map; importimportimportimportjava.util.TreeMap; importimportimportimportcom.lexst.db.statement.DCValue; importimportimportimportcom.lexst.fixp.Command; importimportimportimportcom.lexst.fixp.Packet; importimportimportimportcom.lexst.fixp.Response; importimportimportimportcom.lexst.fixp.Stream; importimportimportimportcom.lexst.util.Numeric; importimportimportimportcom.lexst.util.lock.SingleLock; importimportimportimportcom.lexst.algorithm.aggregate.*; publicpublicpublicpublicclassclassclassclassWordTaskextendsextendsextendsextendsDCTask{ privateprivateprivateprivateSingleLocklock=newnewnewnewSingleLock(); privateprivateprivateprivateintintintintall,count; privateprivateprivateprivateListarray=newnewnewnewArrayList(); privateprivateprivateprivateMapmapStream=newnewnewnew HashMap(); publicpublicpublicpublicWordTask(){ supersupersupersuper(); thisthisthisthis.all=0; thisthisthisthis.count=0; } @Override publicpublicpublicpublicvoidvoidvoidvoidinject(DCPairobject){ ifififif(object.isStream()){ inject_stream(object); www.lexst.com 31 }elseelseelseelseifififif(object.isPacket()){ inject_packet(object); } } privateprivateprivateprivatevoidvoidvoidvoidinject_stream(DCPairobject){ Streamrequest=(Stream)object.getRequest(); bytebytebytebyte[]data=nullnullnullnull; trytrytrytry{ data=request.readContent(); }catchcatchcatchcatch(IOExceptionexp){ } thisthisthisthis.flush(object,data); } privateprivateprivateprivatevoidvoidvoidvoidinject_packet(DCPairobject){ Packetrequest=(Packet)object.getRequest(); bytebytebytebyte[]data=request.getData(); flush(object,data); } privateprivateprivateprivatevoidvoidvoidvoidflush(DCPairobject,bytebytebytebyte[]data){ intintintintsites=dc.getDefineFromSites(); lock.lock(); trytrytrytry{ ifififif(data!=nullnullnullnull&&data.length>0){ intintintintoff=0; intintintintword_size=Numeric.toInteger(data,off,4); off+=4; Stringword=newnewnewnewString(data,off,word_size); off+=word_size; intintintintraws_size=Numeric.toInteger(data,off,4); off+=4; intintintintleft=data.length-off; ByteArrayOutputStreambuff=mapStream.get(word); ifififif(buff==nullnullnullnull){ buff=newnewnewnewByteArrayOutputStream(left*sites); mapStream.put(word,buff); } buff.write(data,off,left); www.lexst.com 32 } ifififif(all==0)all=sites; array.add(object); count++; }catchcatchcatchcatch(Throwableexp){ }finallyfinallyfinallyfinally{ lock.unlock(); } ifififif(count==all){ thisthisthisthis.into(); } } @Override publicpublicpublicpublicvoidvoidvoidvoidexecute(){ DCValuevalue=dc.findToValue("wordsize"); ifififif(value==nullnullnullnull||value.longValue()!=mapStream.size()){ Commandcmd=newnewnewnewCommand(Response.NOTFOUND); Streamresp=newnewnewnewStream(cmd); forforforfor(DCPairobject:array){ object.setResponse(resp); object.finish(); } returnreturnreturnreturn; } ItemSetset=nullnullnullnull; forforforfor(Stringword:mapStream.keySet()){ ByteArrayOutputStreambuff=mapStream.get(word); bytebytebytebyte[]b=buff.toByteArray(); ifififif(set==nullnullnullnull){ set=newnewnewnewItemSet(); forforforfor(intintintintoff=0;offa=newnewnewnewArrayList(set.map.values()); java.util.Collections.sort(a); intintintintelements=a.size()/array.size(); ifififif(a.size()%array.size()!=0)elements+=1; intintintintbegin=0; forforforfor(DCPairobject:array){ ifififif(begin>=a.size()){ Commandcmd=newnewnewnewCommand(Response.NOTFOUND); Streamresp=newnewnewnewStream(cmd); object.setResponse(resp); object.finish(); continuecontinuecontinuecontinue; } intintintintend=(begin+elements>=a.size()?a.size():begin +elements); Listlist=a.subList(begin,end); begin=end; ByteArrayOutputStreamout = newnewnewnew ByteArrayOutputStream(list.size()*6); forforforfor(Itemitem:list){ bytebytebytebyte[]b=Numeric.toBytes(item.documentId); out.write(b,0,b.length); b=Numeric.toBytes(item.weight); out.write(b,0,b.length); } Commandcmd=newnewnewnewCommand(Response.ACCEPTED); Streamresp=newnewnewnewStream(cmd); resp.setData(out.toByteArray()); object.setResponse(resp); object.finish(); } } www.lexst.com 34 ItemSet 类 10.6 搜索请求 前面说过,Call 节点是属于“桥梁”节点,所有外部请求必须通过 Call 节点完成处理 。 Call 有两种运行模式:做为独立应用程序运行和绑定在 WEB 服务器上运行。类似这样的搜 索服务,推荐将 Call 节点绑定在 Tomcat 服务器下运行,编码在 Servlet 中完成,这样可以 很方便调用 Call 节点包的函数。下面的搜索请求在“search”函数实现,是通过 dc 语句分 别对“engine.words”和“engine.zipwords”两个表执行搜索,并返回搜索结果。至于数 据以什么样的形式显示,留给用户自己决定。 @Override publicpublicpublicpublicvoidvoidvoidvoidnaming(bytebytebytebyte[]arg0){ //TODOTODOTODOTODOAuto-generatedmethodstub } } classclassclassclassItemSet{ Mapmap=newnewnewnewTreeMap(); publicpublicpublicpublicvoidvoidvoidvoidadd(Itemitem){ map.put(item.documentId,item); } publicpublicpublicpublicvoidvoidvoidvoidAND(ItemSetset){ ArrayLista=newnewnewnewArrayList(map.size()); forforforfor(Itemitem:map.values()){ Itemother=set.map.get(item.documentId); ifififif(other==nullnullnullnull){ a.add(item.documentId); }elseelseelseelse{ item.add(other.weight); } } forforforfor(intintintintdocumentId:a){ map.remove(documentId); } } } www.lexst.com 35 publicpublicpublicpublicListsearch(String[]words){ StringBuilderwords_query=newnewnewnewStringBuilder(); StringBuilderzipwords_query=newnewnewnewStringBuilder(); forforforfor(intintintinti=0;i0){ words_query.append("OR"); zipwords_query.append("OR"); } Strings=String.format("word=\'%s\'",words[i]); words_query.append(s); zipwords_query.append(s); } Stringwords_select= String.format("select* from engine.wordswhere%s",words_query.toString()); Stringzipwords_select= String.format("select* from engine.zipwordswhere%s",zipwords_query.toString()); Stringwords_dc=String.format( "dcfromnaming:diffuse_wordsquery:\"%s\"to naming:aggregate_wordsvalues:\"wordsize=%d\"", words_select,words.length); Stringzipwords_dc=String.format( "dcfromnaming:diffuse_zipwordsquery:\"%s\"to naming:aggregate_wordsvalues:\"wordsize=%d\"", zipwords_select,words.length); Spacewords_space=newnewnewnewSpace("engine","words"); Spacezipwords_space=newnewnewnewSpace("engine","zipwords"); SQLCharsetcharset = com.lexst.Call.WebLauncher.getInstance().findCharset("engine"); Table words_table= com.lexst.Call.WebLauncher.getInstance().findTable(words_space); Table zipwords_table= com.lexst.Call.WebLauncher.getInstance().findTable(zipwords_space) ; SQLParserparser=newnewnewnewSQLParser(); DCwords_obj=parser.splitDC(words_dc,charset,words_table); DCzipwords_obj= parser.splitDC(zipwords_dc,charset, zipwords_table); words_obj.setIdentity(nextIdentity()); zipwords_obj.setIdentity(nextIdentity()); www.lexst.com 36 longlonglonglong[] chunkIds = com.lexst.Call.pool.BuildPool.getInstance().findBuildChunk("diffu se_zipwords"); ByteArrayOutputStreamout=newnewnewnewByteArrayOutputStream(); bytebytebytebyte[] b1 = com.lexst.Call.pool.DataPool.getInstance().dc(words_obj,chunkIds); ifififif(b1!=nullnullnullnull&&b1.length>0)out.write(b1,0,b1.length); bytebytebytebyte[] b2 = com.lexst.Call.pool.DataPool.getInstance().dc(zipwords_obj); ifififif(b2!=nullnullnullnull&&b2.length>0)out.write(b2,0,b2.length); ArrayListarray=newnewnewnewArrayList(); bytebytebytebyte[]data=out.toByteArray(); ifififif(data!=nullnullnullnull&&data.length>0){ forforforfor(intintintintoff=0;off=0xffffff)number=1; value=((value<<24)|number++); returnreturnreturnreturnvalue; } www.lexst.com 38 11.2 计算模型 在目前版本中,Lexst 提供了几种数据处理算法,解决了大数据计算的一些基本问题, 但是这些还不能满足数据处理日益多样化的需求。未来,我们希望有更多的优秀的算法加入 其中,为丰富数据处理算法,提高数据处理效率提供更多的选择。 11.3 更多数据库功能 Lexst 目前对 SQL 的支持有限,尤其是一些重要功能依然欠缺,如事务处理、连接查询 等。我们正在做这方面的尝试,希望在保持可靠性和简单基础上实现这些的目标。 11.4 多核CPU编程 计算机业公认的一个事实:移动计算的效率比移动数据的效率高,多核芯片计算效率比 单核计算效率高。围绕着 Lexst 各类模块,如何分解内部功能,将串行计算改为并行计算, 我们研发小组已展开多种可行性研究。待取得实质进展,我们会公布这方面的研究成果。 11.5 低功耗硬件 不可否认,通过软件容错,大量低端硬件代替高端硬件后,用户的采购成本得到很大下 降。但另一个问题依然没有解决:由于市场上大量各类 PC和服务器采用的都是 X86 芯 片 , 而这些年 X86 芯片的功耗并没有因为技术升级而有实质降低。体现在 IDC 运营商上,他们的 运营成本没有下降,有些还呈上升趋势。Lexst 希望在这些方面做些工作:针对非 X86 体系 的高性能低功耗 CPU 编程,如ARM 芯片。尽管目前市场上这类芯片不多,但我们已决定这样 做了! 11.6 多版本 严格地说,目前 Lexst 只能运行在 Linux 平台上,对Windows 平台只是部分支持,离应 用还有很大一段距离。所以,提供基于 Lexst for Windows 是我们的下个目标之一。 结束语 相较许多发展成熟的并行存储/计算系统,Lexst 正式开发期很晚,目前已经落后很多。 但是从另一角度讲,这也让我们看见了前人设计的一些缺点和不足。所以在产品设计中,我 们也在尽量避免这种缺憾。但同时有许多未知、未定的问题也是无法回避的,如非结构化数 构的支持,各种算法的调用,新硬件的优化。这些都需要同第三方厂商厂合作。我们希望有 更多的伙伴加入我们的团队,共同完成这个目标。如果你有这个意愿,请加入我们!

下载文档,方便阅读与编辑

文档的实际排版效果,会与网站的显示效果略有不同!!

需要 10 金币 [ 分享文档获得金币 ] 0 人已下载

下载文档

相关文档