JGroups入门 - 实现简单的基于文本的聊天程序
这篇文章的目的就是写一个简单的基于文本的聊天程序(SimpleChat),包含如下功能:
- 所有 SimpleChat 的实例通过一个组来查找彼此
- 无需运行一个中央的聊天服务器来供 SimpleChat 连接,因此也就没有单点故障
- 聊天信息将发送给组中的所有成员
- 当某个实例离开或者加入组或者崩溃时,其他的成员会收到通知
- (可选) 我们维护一个公用的组范围内的状态分享,例如聊天历史记录。新的实例可从已有的实例处获取这些历史记录
JGroups 概要
JGroups 使用 JChannel 作为连接到组、发送和接收消息的主 API,并可通过 JChannel 注册用来处理这些事件(成员加入、退出和发送消息)的侦听器。
而 Messages 是发送的消息,它包含一个字节缓冲区、发送和接受者地址。Addresses 是 org.jgroups.Address 的子类,通常包含一个 IP 地址和端口。
组中的实例列表被成为 View,每个实例包含相同的 View,可通过 View.getMembers() 来获取所有实例的地址列表。
实例 Instances 只能在加入组后才能发送和接收消息。
当一个实例要离开组时,JChannel.disconnect() 或者 JChannel.close() 方法会被调用,后者实际上会判断当连接还保持时调用了 disconnect() 方法来关闭通道。
创建并加入组通道
要加入一个组,我需要使用 JChannel。一个 JChannel 的实例可以通过一个配置来创建,配置中定义了通道的属性。然后通过 connect(String clustername) 来连接到组中。所有通道实例都是调用 connect() 并使用相同的参数来加入相同的组中。下面让我们实际创建一个 JChannel 并连接到名为 ChatCluster 的组中:
import org.jgroups.JChannel; public class SimpleChat { JChannel channel; String user_name=System.getProperty("user.name", "n/a"); private void start() throws Exception { channel=new JChannel(); // use the default config, udp.xml channel.connect("ChatCluster"); } public static void main(String[] args) throws Exception { new SimpleChat().start(); } }
首先我们使用空的构造器来创建一个 JChannel 实例,该方法使用默认的配置。你也可以传递一个 XML 文件来配置这个 JChannel,例如 new JChannel("/home/bela/udp.xml").
connect() 方法加入 ChatCluster 组中。注意我们并不需要事先明确的创建一个组,connect() 方法会判断如果是组中的第一个实例时自动创建该组。之后其他的实例连接过来就加入了相同的组,例如:
- ch1 joining "cluster-one"
- ch2 joining "cluster-two"
- ch3 joining "cluster-two"
- ch4 joining "cluster-one"
- ch5 joining "cluster-three"
这样我们就有三个组:"cluster-one" 包含 ch1 和 ch4, "cluster-two" 包含 ch2 和 ch3, 而 "cluster-three" 只有一个 ch5 实例.
主事件循环和发送聊天消息
现在我们运行一个事件循环来从标准控制台输入中读取文本消息,然后发送到组中所有成员。当输入 exit 或者 quit 命令时,将会退出循环并关闭通道
private void start() throws Exception { channel=new JChannel(); channel.connect("ChatCluster"); eventLoop(); channel.close(); } private void eventLoop() { BufferedReader in=new BufferedReader(new InputStreamReader(System.in)); while(true) { try { System.out.print("> "); System.out.flush(); String line=in.readLine().toLowerCase(); if(line.startsWith("quit") || line.startsWith("exit")) break; line="[" + user_name + "] " + line; Message msg=new Message(null, null, line); channel.send(msg); } catch(Exception e) { } } }
在这里添加了 eventLoop() 的调用,然后关闭通道的方法。
事件处理循环当输入回车时就会发送消息到组中,这是通过构造一个新的 Message 实例然后调用 Channel.send() 方法来发送。
Message 类构造器的首个参数是目标地址,该参数设置为 null 表示将发送给组中所有成员(非空的参数表示要发送到指定的某个成员)。
第二个参数是发送者的地址,这也是 null,JGroups 会自动使用当前的恰当的地址。
第三个参数是我们从标准控制台输入中读到的键盘输入的内容,这个内容会通过 Java 的序列化机制变成 byte[] 数据并设置为 Message 的内容。注意我们也可以自己来序列化一个对象(而且也推荐这样做),然后给 Message 构造器第三个参数传递 byte[] 值。
到这里应用程序功能就已差不多完整,不过还缺少一样,没有对接收到的消息进行提醒和显示,接下来我们介绍消息的接收。
接收消息并查看更改通知
现在我们要注册一个 Receiver 来接收消息并查看组中成员的变动。我们可以实现 org.jgroups.Receiver,不过这里我选用直接继承 ReceiverAdapter 类,因为该类已经有一些默认的实现方法了。然后只需要重载回调函数 receive() 和 viewChange() 即可:
public class SimpleChat extends ReceiverAdapter {
设置接收器的 start() 方法:
private void start() throws Exception { channel=new JChannel(); channel.setReceiver(this); channel.connect("ChatCluster"); eventLoop(); channel.close(); }实现 receive() 和 viewAccepted() 方法:
public void viewAccepted(View new_view) { System.out.println("** view: " + new_view); } public void receive(Message msg) { System.out.println(msg.getSrc() + ": " + msg.getObject()); }
viewAccepted() 回调函数会在新成员加入组中,或者已有成员崩溃了或离开组时被调用。其 toString() 方法会打印 View ID(也就是成员ID)以及当前成员列表。
在 receive() 方法中我们可以得到一个 Message 的参数,只需要读取其缓冲区内容并输出到控制台,同时我们也把发送者的地址打印出来 (Message.getSrc()).
注意我们也可以通过调用 Message.getBuffer() 来获取消息实体中的 byte[] 数据然后通过自己的反序列化来处理,例如 String line=new String(msg.getBuffer()).
测试 SimpleChat 程序
现在我们这个演示程序的所有功能均已完成,可使用如下命令来运行试试(译者注:请自行将 jgroup-xxx.jar 添加到类路径):
[linux]/home/bela$ java SimpleChat ------------------------------------------------------------------- GMS: address=linux-48776, cluster=ChatCluster, physical address=192.168.1.5:42442 ------------------------------------------------------------------- ** view: [linux-48776|0] [linux-48776] >
我们启动的实例名是 linux-48776 ,其物理地址是 192.168.1.5:42442 (IP address:port). 这个名字是由 JGroups 来生成的(如果用户没有设置的话,通常是主机名加一个随机的数字),该名字一直存在并在整个生命周期中保持不变,同时映射到一个基础的 UUID,而 UUID 映射到物理地址。
接下来启动第二个实例:
[linux]/home/bela$ java SimpleChat ------------------------------------------------------------------- GMS: address=linux-37238, cluster=ChatCluster, physical address=192.168.1.5:40710 ------------------------------------------------------------------- ** view: [linux-48776|1] [linux-48776, linux-37238] >
现在组中有两个成员,包括 [linux-48776, linux-37238], 这里显示的是加入组中两个成员的名称。注意第一个成员 (linux-48776) 同样也接收到相同的 view,因此两个成员包含了同样的成员列表,顺序也一致。实例是根据加入组中的先后顺序列表的,因此最早加入的成员排在最前面。
只需要输入消息并按回车键就可以发送消息。消息被发送到组中,因此组中所有成员都将接收到该消息,包括发送者自己。
当输入 exit 或者 quit 并按回车,实例将会退出组。
为了模拟成员崩溃的情况(例如在控制台中按 Ctrl + C),其他的幸存者将接收到一个新的 view ,而当前的组中只有一个成员。
额外信息:维护共享的组数据
JGroups 另外一个使用场景是可以帮你维护一份可在组中共享的数据。例如,Web 服务器中的所有 HTTP 会话。如果这些会话通过组来复制,那么客户端就可以访问组中的任意成员来获得这些数据,就算某个成员崩溃或者退出了,这些数据依然可用。
对会话的更新也会通过组进行复制,例如某个序列化属性被修改了,那么其他成员也会得知这个修改,这样就使得组中所有成员包含了相同的状态。
可当组中加入一个新成员会怎么样呢?新加入的成员必须通过某些方法来获取这些状态数据,这被称为是“状态传输”。
JGroups 的状态传输是通过两个回调函数来实现的 (getState() and setState()) ,我们需要调用 getState() 方法来获取状态数据。注意,为了在应用中使用状态传输,协议堆栈必须有一个状态传输协议(我们这个演示程序使用了默认的堆栈)。
我们修改一下 start() 方法,加入 getState() 的调用:
private void start() throws Exception { channel=new JChannel(); channel.setReceiver(this); channel.connect("ChatCluster"); channel.getState(null, 10000); eventLoop(); channel.close(); }getState() 方法第一个参数是目标成员,null 表示首个成员(协调者)。第二个参数是超时时间,这里我们设置了 10 秒钟的超时时间,以为着状态传输的时间必须在 10 秒内完成,否则将会抛出异常,0 代表没有超时时间。
ReceiverAdapter 定义了 getState() 回调函数,当组中实例(一般是第一个实例,或者也叫协调者)收到一个已有实例要获取组状态时被调用。在我们的示例程序中,我们为聊天会话定义了一个状态,这是一个简单的列表,包含最新的几条聊天信息(这个可能不是一个好的组状态的例子,因为这个状态数据一直在增长).
聊天信息列表并定义为实例变量:
final List<String> state=new LinkedList<String>();
我们还需要修改 receive() 方法来将接收到的消息追加到状态数据中:
public void receive(Message msg) { String line=msg.getSrc() + ": " + msg.getObject(); System.out.println(line); synchronized(state) { state.add(line); } }getState() 回调函数实现如下:
public void getState(OutputStream output) throws Exception { synchronized(state) { Util.objectToStream(state, new DataOutputStream(output)); } }getState() 方法在 “状态提供者” 里调用,当实例返回状态数据后会被转换成输出流。JGroups 会在状态数据写入完毕后关闭流,就算有异常发送也会这样,因此你不需要自己来关闭流。
因为访问状态数据可能是并发的,我们必须做同步控制。然后调用 Util.objectToStream() 这个工具方法来将对象写入流中。
setState() 方法在“状态请求者”处调用,也就是调用了 getState() 方法的成员上,其任务就是从输入流中读取状态数据并保存:
public void setState(InputStream input) throws Exception { List<String> list; list=(List<String>)Util.objectFromStream(new DataInputStream(input)); synchronized(state) { state.clear(); state.addAll(list); } System.out.println(list.size() + " messages in chat history):"); for(String str: list) { System.out.println(str); } }
我们再一次调用 JGroups 工具方法 (Util.objectFromStream()) 来从输入流中创建一个对象,然后对 state 同步并赋值。
在接收完状态数据后,我们打印了状态数据中的聊天信息数。注意这里并没有处理超大列表的情况,可能会发生不可预知的问题。
结论
在这篇教程中,我们向你展示了如何创建、加入和离开组,并给组成员发送和接收消息,获取组成员的变化情况,同时实现了状态的传输。所有这些都是通过 JGroups 的核心 API —— JChannel 和 Receiver 提供的。
还有两个关于 JGroups 方面的内容没有涉及到的,分别是:构建块 (Building blocks) 和协议堆栈。
构建块是一些类继承自 JChannel 提供了更高级别的抽象层,例如请求响应、组范围内的方法调用、复制哈希等等。
而协议堆栈允许对 JGroups 底层通讯协议进行定制,包括配置、移除、增强或者重写全新的协议。
SimpleChat 的代码可从 这里 获取。
这里还有一些其他 JGroups 相关的资料:
- SimpleChat code: SimpleChat.java
- JGroups web site: http://www.jgroups.org
- Downloads: here
- JIRA bug tracking: http://jira.jboss.com/jira/browse/JGRP
- Mailing lists: http://sourceforge.net/mail/?group_id=6081