Configure JBoss Messaging Bridge 說明的是 Queue 的傳送模式,接下來延續的這一篇改用 Topic 的傳送模式並且須為 Persistent Messages ,而前一篇關於 Persistent Messages for Queue 的說明是在進行這一篇測試時,做為先期測試完成後再補上的,因主題上比較相近皆為 Queue 的說明因此擺在同一篇說明裡。
Messaging Bridge 可透過設定
Quality of Service (QoS)
Level 來達到
Persistent Messages
,
QoS
共有三個設定:
QOS_AT_MOST_ONCE、QOS_DUPLICATES_OK、QOS_ONCE_AND_ONLY_ONCE
(設定值依序為0~2,詳見官網說明),
QOS_DUPLICATES_OK
mode 依官網的說明這個 mode ,
Target Destination
有可能會收到重覆的資料但確保訊息都會收到,
QOS_ONCE_AND_ONLY_ONCE
mode 則確保會收到一次,此 mode 是架構在
JTA transaction
因此配置上要有相對應的修改(這一段節錄自
Configure JBoss Messaging Bridge) 。
準備執行環境
- JDK (Java Development Kit) version 1.6+
- JBoss Enterprise Application Platform 5.0
- JBoss Messaging 1.4.6.GA
- MySQL Server 5.1
JBoss Messaging Bridge for Topic sample explanation
Queue 的傳送模式為點對點,因此
Source/Target
的說明適合 Queue mode ,但 Topic 可以有多個 Subscriber ,也就是說
Source/Target
二地的 Topic 都可以有多個 Subscriber ,不過還是維持這種說明方式,方便設定上的說明。
- Source Destination
- 是用來接收 Publisher 送出來的 message
- JBoss_Node-2 =
appService
= 127.0.0.1:1199
- JNDIName:
LocalTopic
- SourceDestinationLookup:
/LocalTopic
- Target Destination
- 是用來接收
Source Destination
轉送過來的 message,Subscriber 由 Target 接收 message
- JBoss_Node-1 =
default
= 127.0.0.1:1099
- JNDIName:
RemoteTopic
- TargetDestinationLookup:
/RemoteTopic
JBoss Messaging Bridge for Topic Configuration
-
XA recovery with JBoss Transactions
的設定,參考 Configure JBoss Messaging Bridge 關於 Persistent Messages
的說明,來修改 %JBOSS_HOME%\server\appService\deploy\messaging\messaging-service.xml
及 %JBOSS_HOME%\server\appService\deploy\remote-jms-ds.xml
這二個設定檔, messaging-service.xml
設定 ServerPeerID
,remote-jms-ds.xml
設定 XAConnectionFactory
。
- 於
JBoss_Node-1
新增 Topic Service 設定,新增 %JBOSS_HOME%\server\default\deploy\RemoteTopic-service.xml
,TopicName 設定為 RemoteTopic
,設定 SecurityConfig
讓 subscriber
role 具有 read/write/create
的權限, read/write
是指對 Message ,create
是指對 Durable Subscriptions , non Durable Subscriptions 則不需要有 Create 的動作。
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<server>
<mbean xmbean-dd="xmdesc/Topic-xmbean.xml" name="jboss.messaging.destination:service=Topic,name=RemoteTopic" code="org.jboss.jms.server.destination.TopicService">
<attribute name="JNDIName">RemoteTopic</attribute>
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
<depends>jboss.messaging:service=PostOffice</depends>
<attribute name="Clustered">true</attribute>
<attribute name="SecurityConfig">
<security>
<role name="subscriber" read="true" write="true" create="true"/>
</security>
</attribute>
</mbean>
</server>
於 JBoss_Node-1
的 DataSource
設定 Role-User Mapping
和 Username/Password
, 於 jbm_role
Table 新增 subscriber/mary
, jbm_user
Table 新增 mary/hotdog
,接下來會使用 User:mary
,而 RemoteTopic
的 SecurityConfig
也配合設定 Role:subscriber
;而其他預設的 Role-User Mapping
可參考 %JBOSS_HOME%\server\default\deploy\messaging\mysql-persistence-service.xml
。
- 於
JBoss_Node-2
新增 Topic Service 設定,新增 %JBOSS_HOME%\server\appService\deploy\LocalTopic-service.xml
,TopicName 設定為 LocalTopic
,設定 SecurityConfig
為 durpublisher
role。
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<server>
<mbean xmbean-dd="xmdesc/Topic-xmbean.xml" name="jboss.messaging.destination:service=Topic,name=LocalTopic" code="org.jboss.jms.server.destination.TopicService">
<attribute name="JNDIName">LocalTopic</attribute>
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
<depends>jboss.messaging:service=PostOffice</depends>
<attribute name="Clustered">true</attribute>
<attribute name="SecurityConfig">
<security>
<role name="durpublisher" read="true" write="true" create="true"/>
</security>
</attribute>
</mbean>
</server>
比照前一點於 JBoss_Node-2
設定 Role-User Mapping
和 Username/Password
, 於 jbm_role
Table 新增 durpublisher/bob
, jbm_user
Table 新增 bob/cheesecake
,接下來會使用 User:bob
,而 LocalTopic
的 SecurityConfig
也配合設定 Role:durpublisher
。
- 於
JBoss_Node-2
新增 Topic Messaging Bridge Service 設定,新增 %JBOSS_HOME%\server\appService\deploy\TopicBridge-service.xml
。
- BridgeService Name:
TopicBridge
。
- SourceDestinationLookup:
/LocalTopic
、TargetDestinationLookup:/RemoteTopic
。
QualityOfServiceMode
設定為 2 也就是 QOS_ONCE_AND_ONLY_ONCE
mode。
SourceUsername/SourcePassword
設定第3點於 JBoss_Node-2
新增的 User: bob/cheesecake
,TargetUsername/TargetPassword
設定第2點於 JBoss_Node-1
新增的 User: mary/hotdog
。
- 如不新增 User 也可直接設定 User:
guest
,但無法使用 User:john
因 ClientID 已設定為 DurableSubscriberExample
,而這裡為了測試說明因此設定不同的 User。
- ClientID:
clientNode2
、SubName:localDurableSub1
,SubName - Durable Subscription Name:長期訂閱者的名稱,以 ClientID + SubscriptionName
來識別訂閱者,可以用同一個 ClientID
搭配不同的 Subscription
,或是一對一的搭配。
- 只要 Subscriber 不給定一個名稱就是
Non Durable Subscriptions
(非長期訂閱),而非長期訂閱要是訂閱者不在線上的期間會遺失訊息,因此如果要設定為 Persistent Messages
則 ClientID/SubName
都須要設定,也就是說 BridgeService
是使用 localDurableSub1
在接收訊息,再將接收的訊息傳給 JBoss_Node-1
,因此如不設定 Subscription
則 JBoss_Node-1
Stop 期間會遺失訊息。
<?xml version="1.0" encoding="UTF-8"?>
<server>
<mbean code="org.jboss.jms.server.bridge.BridgeService" name="jboss.messaging:service=Bridge,name=TopicBridge" xmbean-dd="xmdesc/Bridge-xmbean.xml">
<depends optional-attribute-name="SourceProviderLoader">jboss.messaging:service=JMSProviderLoader,name=JMSProvider</depends>
<depends optional-attribute-name="TargetProviderLoader">jboss.messaging:service=JMSProviderLoader,name=RemoteJMSProvider</depends>
<attribute name="SourceDestinationLookup">/LocalTopic</attribute>
<attribute name="TargetDestinationLookup">/RemoteTopic</attribute>
<attribute name="QualityOfServiceMode">2</attribute>
<attribute name="MaxBatchSize">5</attribute>
<attribute name="MaxBatchTime">1</attribute>
<attribute name="FailureRetryInterval">10000</attribute>
<attribute name="MaxRetries">-1</attribute>
<attribute name="AddMessageIDInHeader">false</attribute>
<attribute name="SourceUsername">bob</attribute>
<attribute name="SourcePassword">cheesecake</attribute>
<attribute name="TargetUsername">mary</attribute>
<attribute name="TargetPassword">hotdog</attribute>
<attribute name="SubName">localDurableSub1</attribute>
<attribute name="ClientID">clientNode2</attribute>
</mbean>
</server>
Testing Messaging Publish & Subscribe
接下來增加二支 Java Sample 測試 Publish & Subscribe Messaging。
-
新增
BridgeTopicSend.java
內容如下,測試 Publish 對 TOPIC:LocalTopic
發送訊息,如上面的說明 LocalTopic
是指 JBoss_Node-2
(Source Destination) 的設定, createTopicConnection 時設定 User: bob/cheesecake
(line-30
),設定 ClientID:clientNode2
(line-31
)。
import java.io.*;
import java.util.*;
import javax.transaction.*;
import javax.naming.*;
import javax.jms.*;
import javax.rmi.PortableRemoteObject;
public class BridgeTopicSend
{
public final static String JNDI_FACTORY="org.jnp.interfaces.NamingContextFactory";
//*************** Connection Factory JNDI name *************************
public final static String CONN_FACTORY="/ConnectionFactory";
//*************** Topic JNDI name *************************
public final static String TOPIC="/LocalTopic";
protected TopicConnectionFactory tconFactory;
protected TopicConnection tcon;
protected TopicSession tsession;
protected TopicPublisher tpublisher;
protected Topic topic;
protected TextMessage msg;
public void init(Context ctx, String topicName) throws NamingException, JMSException
{
tconFactory = (TopicConnectionFactory) PortableRemoteObject.narrow(ctx.lookup(CONN_FACTORY),TopicConnectionFactory.class);
//Subscription Durability
tcon = tconFactory.createTopicConnection("bob", "cheesecake");
tcon.setClientID("clientNode2");
tsession = tcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topic = (Topic) PortableRemoteObject.narrow(ctx.lookup(topicName), Topic.class);
tpublisher = tsession.createPublisher(topic);
msg = tsession.createTextMessage();
tcon.start();
}
public void send(String message) throws JMSException {
msg.setText(message);
tpublisher.publish(msg);
}
public void close() throws JMSException {
tpublisher.close();
tsession.close();
tcon.close();
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.out.println("Usage: java Bridge TopicSend URL");
return;
}
InitialContext ic = getInitialContext(args[0]);
BridgeTopicSend ts = new BridgeTopicSend();
ts.init(ic, TOPIC);
readAndSend(ts);
ts.close();
}
protected static void readAndSend(BridgeTopicSend ts)throws IOException, JMSException
{
BufferedReader msgStream = new BufferedReader (new InputStreamReader(System.in));
String line=null;
System.out.print("nt Bridge TopicSender(ClientID:clientNode2) Started ... Enter message (\"quit\" to quit): \n");
do {
System.out.print("Topic Sender Says > ");
line = msgStream.readLine();
if (line != null && line.trim().length() != 0) {
ts.send(line);
}
} while (line != null && ! line.equalsIgnoreCase("quit"));
}
protected static InitialContext getInitialContext(String url)
throws NamingException
{
Hashtable<String,String> env = new Hashtable<String,String>();
env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
env.put(Context.PROVIDER_URL, url);
env.put("org.jnp.interfaces.NamingContextFactory", "true");
return new InitialContext(env);
}
}
-
新增
BridgeTopicReceive.java
內容如下,測試 Subscriber 對 TOPIC:RemoteTopic
接收訊息,如上面的說明 RemoteTopic
是指 JBoss_Node-1
(Target Destination) 的設定, createTopicConnection 時設定 User: mary/hotdog
(line-51
),設定 ClientID:clientNode1
(line-52
),設定 DurableSubscriber:remoteDurableSub1
(line-56
) ,於第一次執行 BridgeTopicReceive.java
時 remoteDurableSub1
會被建立,之後只要有訊息傳送給 TOPIC:RemoteTopic
都可透過 remoteDurableSub1
接受訊息。
import java.io.*;
import java.util.*;
import javax.transaction.*;
import javax.naming.*;
import javax.jms.*;
import javax.rmi.PortableRemoteObject;
public class BridgeTopicReceive implements MessageListener {
public final static String JNDI_FACTORY="org.jnp.interfaces.NamingContextFactory";
//*************** Connection Factory JNDI name *************************
public final static String CONN_FACTORY="/ConnectionFactory";
//*************** Topic JNDI name *************************
public final static String TOPIC="/RemoteTopic";
private TopicConnectionFactory tconFactory;
private TopicConnection tcon;
private TopicSession tsession;
private TopicSubscriber tsubscriber;
private Topic topic;
private boolean quit = false;
public void onMessage(Message msg)
{
try {
String msgText;
if (msg instanceof TextMessage) {
msgText = ((TextMessage)msg).getText();
} else {
msgText = msg.toString();
}
System.out.println("JMS Message Received: "+ msgText );
if (msgText.equalsIgnoreCase("quit")) {
synchronized(this) {
quit = true;
this.notifyAll();
}
}
} catch (JMSException jmse) {
System.err.println("An exception occurred: "+jmse.getMessage());
}
}
public void init(Context ctx, String topicName)throws NamingException, JMSException
{
tconFactory = (TopicConnectionFactory)PortableRemoteObject.narrow(ctx.lookup(CONN_FACTORY), TopicConnectionFactory.class);
//Subscription Durability
tcon = tconFactory.createTopicConnection("mary", "hotdog");
tcon.setClientID("clientNode1");
tsession = tcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topic = (Topic) PortableRemoteObject.narrow(ctx.lookup(topicName), Topic.class);
tsubscriber = tsession.createDurableSubscriber(topic, "remoteDurableSub1");
tsubscriber.setMessageListener(this);
tcon.start();
}
public void close() throws JMSException {
tsubscriber.close();
tsession.close();
tcon.close();
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.out.println("Usage: java examples.jms.topic.BridgeTopicReceive URL");
return;
}
InitialContext ic = getInitialContext(args[0]);
BridgeTopicReceive tr = new BridgeTopicReceive();
tr.init(ic, TOPIC);
System.out.println("JMS Ready To Receive Messages (ClientID:clientNode1,DurableSubscriber:remoteDurableSub1) (To quit, send a \"quit\" message).");
synchronized(tr) {
while (! tr.quit) {
try {
tr.wait();
} catch (InterruptedException ie) {}
}
}
tr.close();
}
private static InitialContext getInitialContext(String url) throws NamingException
{
Hashtable<String,String> env = new Hashtable<String,String>();
env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
env.put(Context.PROVIDER_URL, url);
env.put("org.jnp.interfaces.NamingContextFactory", "true");
return new InitialContext(env);
}
}
-
如果先啟動
JBoss_Node-2
再啟動 JBoss_Node-1
,則 Node-2 會有 Bridge Failed 的訊息,直到 Node-1 啟動會有 Bridge Succeeded in connecting to servers 的訊息表示 Bridge 連結成功。
%JBOSS_HOME%\bin\run.bat
%JBOSS_HOME%\appService\bin\runAppService.bat
-
新增
run.bat
file,Compiler & Run
- BridgeTopicSend.java
、BridgeTopicReceive.java
。
@echo off
time /t
set JBOSS_HOEM=C:\jboss-eap-5.0\jboss-as
javac -cp $CLASSPATH;%JBOSS_HOME%\client\jbossall-client.jar;%JBOSS_HOME%\client\jboss-javaee.jar;%JBOSS_HOME%\common\lib\jnpserver.jar;%JBOSS_HOME%\client\jboss-logging-spi.jar;%JBOSS_HOME%\lib\jboss-javaee.jar;%JBOSS_HOME%\common\lib\servlet-api.jar;%JBOSS_HOME%\common\lib\log4j.jar BridgeTopicSend.java
rem java -cp $CLASSPATH;.;%JBOSS_HOME%\client\jbossall-client.jar;%JBOSS_HOME%\client\jboss-javaee.jar;%JBOSS_HOME%\common\lib\jnpserver.jar;%JBOSS_HOME%\client\jboss-logging-spi.jar;%JBOSS_HOME%\lib\jboss-javaee.jar;%JBOSS_HOME%\common\lib\servlet-api.jar;%JBOSS_HOME%\common\lib\log4j.jar BridgeTopicSend jnp://127.0.0.1:1199
rem javac -cp $CLASSPATH;%JBOSS_HOME%\client\jbossall-client.jar;%JBOSS_HOME%\client\jboss-javaee.jar;%JBOSS_HOME%\common\lib\jnpserver.jar;%JBOSS_HOME%\client\jboss-logging-spi.jar;%JBOSS_HOME%\lib\jboss-javaee.jar;%JBOSS_HOME%\common\lib\servlet-api.jar;%JBOSS_HOME%\common\lib\log4j.jar BridgeTopicReceive.java
rem java -cp $CLASSPATH;.;%JBOSS_HOME%\client\jbossall-client.jar;%JBOSS_HOME%\client\jboss-javaee.jar;%JBOSS_HOME%\common\lib\jnpserver.jar;%JBOSS_HOME%\client\jboss-logging-spi.jar;%JBOSS_HOME%\lib\jboss-javaee.jar;%JBOSS_HOME%\common\lib\servlet-api.jar;%JBOSS_HOME%\common\lib\log4j.jar BridgeTopicReceive jnp://127.0.0.1:1099
time /t
- 上述測試成功後,可再建立如下的摸擬測試。
- Source Destination
- JBoss_Node-2 =
appService
= 127.0.0.1:1199
- 比照第2點建立二個 Receive java file ,模擬二個 Durable Subscriber
- User 都使用
bob/cheesecake
, ClientID 都使用 clientNode2
- createDurableSubscriber 分別為:
localDurableSub2/localDurableSub3
,因 localDurableSub1
已經是 Bridge Service 在使用了
- Target Destination
- JBoss_Node-1 =
default
= 127.0.0.1:1099
- 比照第2點建立一個 Receive java file ,加上
BridgeTopicReceive.java
也是模擬二個 Durable Subscriber
- User 使用
mary/hotdog
, ClientID 使用 clientNode1
- createDurableSubscriber :
remoteDurableSub2
-
前一點的模擬環境主要是針對
JBoss_Node-1
、JBoss_Node-2
多個訂閱者的測試,測試的重點則為(以下的測試皆可正常運作):
- 多個訂閱者是否都可收到訊息
- 訂閱者於不同時間點上線是否可收到 Publish 發佈的完整訊息
JBoss_Node-1
無法運作期間 Publish 發佈的訊息,是否可於 JBoss_Node-1
啟動時,再透過 JBoss_Node-1
接收訊息
也可從 Admin Console
看到 Durable Message
、Durable Subscriptions
的數量,如下圖是JBoss_Node-2
的 Admin Console
畫面。
參考文章如下:
JBoss Messaging 1.4 User's Guide
How to create a Topic in JBoss AS 6 ?
Bridge cannot find session with QOS_ONCE_AND_ONLY_ONCE
A Pub-Sub With Durable Topic Example
How to create a durable subscriber for topic on Jboss 5?
Java Message Service 5.2 Tutorial
沒有留言:
張貼留言