JBoss Messaging Bridge 可以將 messages 從一台 JBoss Server 傳送到另一台 JBoss Server,也就是將 messages 傳送至甲地(Source Destination)而由乙地(Target Destination)接收 messages,Queue跟Topic 二種模式都支援。
以下的測試主要是參考
How to Configure JBoss Messaging Bridge in JBoss AS 5 ? 。
準備執行環境
因測試需要二台 JBoss Server 以新增 JBoss Nodes的方式來建立測試環境,新增 JBoss Nodes可以參考另一篇文章:
JBoss EAP 5 Multi Apps Services。
JBoss Messaging 1.4.6.GA 只是列出版本號碼,不需另外安裝,JBoss EAP 5.0 即包含 JBoss Messaging 。
- JDK (Java Development Kit) version 1.6+
- JBoss Enterprise Application Platform 5.0
- JBoss Messaging 1.4.6.GA
JBoss Messaging Bridge sample explanation
- Source Destination
- 是用來接收 Sender 送出來的 message
- JBoss_Node-2 =
appService
= 127.0.0.1:1199
- JNDIName:
TestQ
- SourceDestinationLookup:
/TestQ
- Target Destination
- 是用來接收
Source Destination
轉送過來的 message,Receiver由 Target 接收 message
- JBoss_Node-1 =
default
= 127.0.0.1:1099
- JNDIName:
remoteTestQ
- TargetDestinationLookup:
/remoteLocation/remoteTestQ
JBoss Messaging Bridge Configuration
- 於
JBoss_Node-1
新增 Queue Service 設定,新增 %JBOSS_HOME%\server\default\deploy\TestQueue-service.xml
,QueueName 設定為 remoteTestQ
。
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<server>
<mbean xmbean-dd="xmdesc/Queue-xmbean.xml" name="jboss.messaging.destination:service=Queue,name=remoteTestQ" code="org.jboss.jms.server.destination.QueueService">
<attribute name="DLQ"> jboss.messaging.destination:name=DLQ,service=Queue</attribute>
<attribute name="ExpiryQueue">jboss.messaging.destination:name=ExpiryQueue,service=Queue</attribute>
<attribute name="JNDIName">remoteTestQ</attribute>
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
<depends>jboss.messaging:service=PostOffice</depends>
</mbean>
</server>
- 於
JBoss_Node-2
新增 Remote Messaging Server 設定 ,對 JBoss_Node-2
來說 Remote Messaging Server 就是 JBoss_Node-1
,新增 %JBOSS_HOME%\server\appService\deploy\remote-jms-ds.xml
,而 Server Name 設定為 127.0.0.1:1099
。
<?xml version="1.0" encoding="UTF-8"?>
<connection-factories>
<mbean code="org.jboss.jms.jndi.JMSProviderLoader" name="jboss.messaging:service=JMSProviderLoader,name=RemoteJMSProvider">
<attribute name="ProviderName">RemoteJMSProvider</attribute>
<attribute name="ProviderAdapterClass">org.jboss.jms.jndi.JNDIProviderAdapter</attribute>
<attribute name="FactoryRef">remoteLocation/ConnectionFactory</attribute>
<attribute name="QueueFactoryRef">remoteLocation/ConnectionFactory</attribute>
<attribute name="TopicFactoryRef">remoteLocation/ConnectionFactory</attribute>
<depends>jboss.jndi:service=ExternalContext,jndiName=remoteLocation</depends>
</mbean>
<mbean code="org.jboss.naming.ExternalContext" name="jboss.jndi:service=ExternalContext,jndiName=remoteLocation">
<attribute name="JndiName">remoteLocation</attribute>
<attribute name="Properties">
java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
java.naming.provider.url=jnp://127.0.0.1:1099
</attribute>
</mbean>
</connection-factories>
- 於
JBoss_Node-2
新增 Queue Service 設定,新增 %JBOSS_HOME%\server\appService\deploy\TestQ-service.xml
,QueueName 設定為 TestQ
。
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<server>
<mbean xmbean-dd="xmdesc/Queue-xmbean.xml" name="jboss.messaging.destination:service=Queue,name=TestQ" code="org.jboss.jms.server.destination.QueueService">
<attribute name="DLQ"> jboss.messaging.destination:name=DLQ,service=Queue</attribute>
<attribute name="ExpiryQueue">jboss.messaging.destination:name=ExpiryQueue,service=Queue</attribute>
<attribute name="JNDIName">TestQ</attribute>
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
<depends>jboss.messaging:service=PostOffice</depends>
</mbean>
</server>
- 於
JBoss_Node-2
新增 Messaging Bridge Service 設定,新增 %JBOSS_HOME%\server\appService\deploy\TestBridge-service.xml
,SourceDestinationLookup:/TestQ
、TargetDestinationLookup:/remoteLocation/remoteTestQ
,Target 的 /remoteLocation/remoteTestQ
其中的/remoteLocation
是 Remote Messaging Server 的設定。
<?xml version="1.0" encoding="UTF-8"?>
<server>
<mbean code="org.jboss.jms.server.bridge.BridgeService" name="jboss.messaging:service=Bridge,name=TestBridge" xmbean-dd="xmdesc/Bridge-xmbean.xml">
<depends optional-attribute-name="SourceProviderLoader">jboss.messaging:service=JMSProviderLoader,name=JMSProvider</depends>
<depends optional-attribute-name="TargetProviderLoader">jboss.messaging:service=JMSProviderLoader,name=RemoteJMSProvider</depends>
<attribute name="SourceDestinationLookup">/TestQ</attribute>
<attribute name="TargetDestinationLookup">/remoteLocation/remoteTestQ</attribute>
<attribute name="QualityOfServiceMode">1</attribute>
<attribute name="MaxBatchSize">5</attribute>
<attribute name="MaxBatchTime">1</attribute>
<attribute name="FailureRetryInterval">5000</attribute>
<attribute name="MaxRetries">-1</attribute>
<attribute name="AddMessageIDInHeader">false</attribute>
</mbean>
</server>
Testing Messaging Send & Receive
接下來增加二支 Java Sample 測試 Send & Receive Messaging。
-
新增
QueueSend.java
內容如下,測試 Sender 對 QUEUE:TestQ
發送訊息,如上面的說明 TestQ
是指 JBoss_Node-2
(Source Destination) 的設定。
import java.io.*;
import java.util.Hashtable;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class QueueSend
{
public final static String JNDI_FACTORY="org.jnp.interfaces.NamingContextFactory";
//*************** Connection Factory JNDI name *************************
public final static String JMS_FACTORY="/ConnectionFactory";
//*************** Queue JNDI name *************************
public final static String QUEUE="/TestQ";
private QueueConnectionFactory qconFactory;
private QueueConnection qcon;
private QueueSession qsession;
private QueueSender qsender;
private Queue queue;
private TextMessage msg;
public void init(Context ctx, String queueName)throws NamingException, JMSException
{
qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
qcon = qconFactory.createQueueConnection();
qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queue = (Queue) ctx.lookup(queueName);
qsender = qsession.createSender(queue);
msg = qsession.createTextMessage();
qcon.start();
}
public void send(String message) throws JMSException {
msg.setText(message);
qsender.send(msg);
}
public void close() throws JMSException {
qsender.close();
qsession.close();
qcon.close();
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.out.println("Usage: java QueueSend URL");
return;
}
InitialContext ic = getInitialContext(args[0]);
QueueSend qs = new QueueSend();
qs.init(ic, QUEUE);
readAndSend(qs);
qs.close();
}
private static void readAndSend(QueueSend qs) throws IOException, JMSException
{
String line="Test Message Body with counter = ";
BufferedReader br=new BufferedReader(new InputStreamReader(System.in));
boolean readFlag=true;
System.out.println("ntStart Sending Messages (Enter QUIT to Stop):n");
while(readFlag)
{
System.out.print("<msg_sender> ");
String msg=br.readLine();
if(msg.equals("QUIT") || msg.equals("quit"))
{
qs.send(msg);
System.exit(0);
}
qs.send(msg);
System.out.println();
}
br.close();
}
private static InitialContext getInitialContext(String url) throws NamingException
{
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
env.put(Context.PROVIDER_URL, url);
return new InitialContext(env);
}
}
-
新增
QueueReceive.java
內容如下,測試 Receiver 對 QUEUE:remoteTestQ
接收訊息,如上面的說明 remoteTestQ
是指 JBoss_Node-1
(Target Destination) 的設定。
import java.util.Hashtable;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class QueueReceive implements MessageListener
{
public final static String JNDI_FACTORY="org.jnp.interfaces.NamingContextFactory";
//*************** Connection Factory JNDI name *************************
public final static String JMS_FACTORY="/ConnectionFactory";
//*************** Connection Factory JNDI name *************************
public final static String QUEUE="/remoteTestQ";
private QueueConnectionFactory qconFactory;
private QueueConnection qcon;
private QueueSession qsession;
private QueueReceiver qreceiver;
private Queue queue;
private boolean quit = false;
public void onMessage(Message msg)
{
try {
String msgText;
if (msg instanceof TextMessage)
{
msgText = ((TextMessage)msg).getText();
}
else
{
msgText = msg.toString();
}
System.out.println("\n\t<Msg_Receiver> "+ msgText );
if (msgText.equalsIgnoreCase("quit"))
{
synchronized(this)
{
quit = true;
this.notifyAll(); // Notify main thread to quit
}
}
}
catch (JMSException jmse)
{
jmse.printStackTrace();
}
}
public void init(Context ctx, String queueName) throws NamingException, JMSException
{
qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
qcon = qconFactory.createQueueConnection();
qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
queue = (Queue) ctx.lookup(queueName);
qreceiver = qsession.createReceiver(queue);
qreceiver.setMessageListener(this);
qcon.start();
}
public void close()throws JMSException
{
qreceiver.close();
qsession.close();
qcon.close();
}
public static void main(String[] args) throws Exception
{
if (args.length != 1)
{
System.out.println("Usage: java QueueReceive URL");
return;
}
InitialContext ic = getInitialContext(args[0]);
QueueReceive qr = new QueueReceive();
qr.init(ic, QUEUE);
System.out.println("JMS Ready To Receive Messages (To quit, send a \"quit\" message from QueueSender.class).");
// Wait until a "quit" message has been received.
synchronized(qr)
{
while (! qr.quit)
{
try
{
qr.wait();
}
catch (InterruptedException ie)
{}
}
}
qr.close();
}
private static InitialContext getInitialContext(String url) throws NamingException
{
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
env.put(Context.PROVIDER_URL, url);
return new InitialContext(env);
}
}
-
先啟動
JBoss_Node-1
再啟動 JBoss_Node-2
,因 Node-2 的 Remote Messaging Server 設定有連接到 Node-1 所以啟動有先後順序。
%JBOSS_HOME%\bin\run.bat
%JBOSS_HOME%\appService\bin\runAppService.bat
-
新增
run.bat
file,Compiler & Run
- QueueSend.java
、QueueReceive.java
。
@echo off
time /t
set JBOSS_HOEM=C:\jboss-eap-5.0\jboss-as
javac -cp $CLASSPATH;%JBOSS_HOME%\client\jbossall-client.jar;%JBOSS_HOME%\client\jboss-javaee.jar;%JBOSS_HOME%\common\lib\jnpserver.jar;%JBOSS_HOME%\client\jboss-logging-spi.jar;%JBOSS_HOME%\lib\jboss-javaee.jar;%JBOSS_HOME%\common\lib\servlet-api.jar;%JBOSS_HOME%\common\lib\log4j.jar QueueSend.java
rem java -cp $CLASSPATH;.;%JBOSS_HOME%\client\jbossall-client.jar;%JBOSS_HOME%\client\jboss-javaee.jar;%JBOSS_HOME%\common\lib\jnpserver.jar;%JBOSS_HOME%\client\jboss-logging-spi.jar;%JBOSS_HOME%\lib\jboss-javaee.jar;%JBOSS_HOME%\common\lib\servlet-api.jar;%JBOSS_HOME%\common\lib\log4j.jar QueueSend jnp://127.0.0.1:1199
rem javac -cp $CLASSPATH;%JBOSS_HOME%\client\jbossall-client.jar;%JBOSS_HOME%\client\jboss-javaee.jar;%JBOSS_HOME%\common\lib\jnpserver.jar;%JBOSS_HOME%\client\jboss-logging-spi.jar;%JBOSS_HOME%\lib\jboss-javaee.jar;%JBOSS_HOME%\common\lib\servlet-api.jar;%JBOSS_HOME%\common\lib\log4j.jar QueueReceive.java
rem java -cp $CLASSPATH;.;%JBOSS_HOME%\client\jbossall-client.jar;%JBOSS_HOME%\client\jboss-javaee.jar;%JBOSS_HOME%\common\lib\jnpserver.jar;%JBOSS_HOME%\client\jboss-logging-spi.jar;%JBOSS_HOME%\lib\jboss-javaee.jar;%JBOSS_HOME%\common\lib\servlet-api.jar;%JBOSS_HOME%\common\lib\log4j.jar QueueReceive jnp://127.0.0.1:1099
time /t
-
執行結果如下,此 Sample 有一點小問題,Sender 要結束輸入 quit 會連 Receiver 一起結束,因 Receiver 也是判斷接收的字串來決定是否結束,但並不影响測試的目的。
輸入訊息傳送到 JBoss_Node-2
在 JBoss_Node-1
接收訊息
-
另外也可以對
JBoss_Node-2
的 QUEUE:TestQ
接收訊息,但因 QUEUE 的行為只能被一個 Receiver 接收,因此如果被 QUEUE:TestQ
接收訊息則 QUEUE:remoteTestQ
無法再接收訊息,依測試的結果二個 Receiver 都開啟同時 Listener ,會輪流收到訊息,這是測試10次左右的觀察結果。
Persistent Messages mode Configuration(2013.01.24)
這部份是後續新增的,Messaging Bridge 可透過設定
Quality of Service (QoS)
Level 來達到
Persistent Messages
,
QoS
共有三個設定:
QOS_AT_MOST_ONCE、QOS_DUPLICATES_OK、QOS_ONCE_AND_ONLY_ONCE
(設定值依序為0~2,詳見官網說明),文章開頭的 Sample 是使用
QOS_DUPLICATES_OK
mode 依官網的說明這個 mode ,
Target Destination
有可能會收到重覆的資料但確保訊息都會收到,
QOS_ONCE_AND_ONLY_ONCE
mode 則確保會收到一次,此 mode 是架構在
JTA transaction
因此配置上有相對應的修改,而以下的設定都是在
JBoss_Node-2
。
-
修改
%JBOSS_HOME%\server\appService\deploy\messaging\messaging-service.xml
(以下為部份的節錄), 修改 ServerPeerID
參數如 xml 上的說明,每個 node 都必須是唯一的,此 sample 有二個 JBoss Node 而 JBoss_Node-1
不做修改,因此將 ${jboss.messaging.ServerPeerID:0}
改為 ${jboss.messaging.ServerPeerID:1}
,修改後存檔。
<?xml version="1.0" encoding="UTF-8"?>
<!--
The JBoss Messaging service deployment descriptor.
$Id: messaging-service.xml 85945 2009-03-16 19:45:12Z dimitris@jboss.org $
-->
<server>
<!-- ServerPeer MBean configuration
============================== -->
<mbean code="org.jboss.jms.server.ServerPeer"
name="jboss.messaging:service=ServerPeer"
xmbean-dd="xmdesc/ServerPeer-xmbean.xml">
<!-- The unique id of the server peer - in a cluster each node MUST have a unique value - must be an integer -->
<attribute name="ServerPeerID">${jboss.messaging.ServerPeerID:1}</attribute>
....
</mbean>
</server>
- 修改
%JBOSS_HOME%\server\appService\deploy\remote-jms-ds.xml
,而 lien 6~8
都改設定為 XAConnectionFactory
,於官網的說明是採用 QOS_ONCE_AND_ONLY_ONCE
mode 必須設定 XAConnectionFactory
,才會有 XA recovery with JBoss Transactions
的功能,而依實際測試如果設定不改為 XAConnectionFactory
,則佈署及執行都不會有問題,但 JBoss_Node-1
如果 restart , JBoss_Node-2
的 Bridge 無法重新連線 JBoss_Node-1
,必須 JBoss_Node-2
也 restart 才能重新建立連線; JBoss_Node-1
Remote 連線改為 Properties 設定(line 9 ~ 13
)。
<?xml version="1.0" encoding="UTF-8"?>
<connection-factories>
<mbean code="org.jboss.jms.jndi.JMSProviderLoader" name="jboss.messaging:service=JMSProviderLoader,name=RemoteJMSProvider">
<attribute name="ProviderName">RemoteJMSProvider</attribute>
<attribute name="ProviderAdapterClass">org.jboss.jms.jndi.JNDIProviderAdapter</attribute>
<attribute name="FactoryRef">/XAConnectionFactory</attribute>
<attribute name="QueueFactoryRef">/XAConnectionFactory</attribute>
<attribute name="TopicFactoryRef">/XAConnectionFactory</attribute>
<attribute name="Properties">
java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
java.naming.provider.url=jnp://127.0.0.1:1099
</attribute>
</mbean>
</connection-factories>
- 修改
%JBOSS_HOME%\server\appService\deploy\TestBridge-service.xml
,將 QualityOfServiceMode
設定為 2 也就是 QOS_ONCE_AND_ONLY_ONCE
mode ,而前一點有提到關於 XAConnectionFactory
的設定, RemoteJMSProvider
已於前一點設定完成,至於 JMSProvider
則可參考 %JBOSS_HOME%\server\appService\deploy\messaging\jms-ds.xml
預設也已設定為 XAConnectionFactory
(附註一點: java:
開頭表示為local的設定);因 remote-jms-ds.xml
的 remoteLocation
JndiName 改設定了,所以 TargetDestinationLookup 也改為 /remoteTestQ
。
<?xml version="1.0" encoding="UTF-8"?>
<server>
<mbean code="org.jboss.jms.server.bridge.BridgeService" name="jboss.messaging:service=Bridge,name=TestBridge" xmbean-dd="xmdesc/Bridge-xmbean.xml">
<depends optional-attribute-name="SourceProviderLoader">jboss.messaging:service=JMSProviderLoader,name=JMSProvider</depends>
<depends optional-attribute-name="TargetProviderLoader">jboss.messaging:service=JMSProviderLoader,name=RemoteJMSProvider</depends>
<attribute name="SourceDestinationLookup">/TestQ</attribute>
<attribute name="TargetDestinationLookup">/remoteTestQ</attribute>
<attribute name="QualityOfServiceMode">2</attribute>
<attribute name="MaxBatchSize">5</attribute>
<attribute name="MaxBatchTime">1</attribute>
<attribute name="FailureRetryInterval">5000</attribute>
<attribute name="MaxRetries">-1</attribute>
<attribute name="AddMessageIDInHeader">false</attribute>
</mbean>
</server>
- 以上三點修改後即可依照之前的步驟進行測試,測試的重點則是
JBoss_Node-2
至 JBoss_Node-1
的訊息不會有遺漏的情況,以下畫面為 JBoss_Node-1
restart 時在 JBoss_Node-2
的 log,可看到 JBoss_Node-1
已啟動後 Bridge reconnecting 的訊息 (retry 時間的設定則參考 TestBridge-service.xml
- FailureRetryInterval
參數);至於傳送到 JBoss_Node-2
的訊息如何確保送達則是 Sender 該負責的議題不在此討論。
參考文章如下:
How to Configure JBoss Messaging Bridge in JBoss AS 5 ?
JBoss Messaging 2.0 Quickstart Guide
Spring Gossip: 訊息(Message)觀念
Jboss JMS Bridge (Jboss to Jboss)
沒有留言:
張貼留言