2013年1月29日 星期二

Configure persistent Topic in JBoss Messaging Bridge

Configure JBoss Messaging Bridge 說明的是 Queue 的傳送模式,接下來延續的這一篇改用 Topic 的傳送模式並且須為 Persistent Messages ,而前一篇關於 Persistent Messages for Queue 的說明是在進行這一篇測試時,做為先期測試完成後再補上的,因主題上比較相近皆為 Queue 的說明因此擺在同一篇說明裡。

Messaging Bridge 可透過設定 Quality of Service (QoS) Level 來達到 Persistent MessagesQoS 共有三個設定:QOS_AT_MOST_ONCE、QOS_DUPLICATES_OK、QOS_ONCE_AND_ONLY_ONCE(設定值依序為0~2,詳見官網說明), QOS_DUPLICATES_OK mode 依官網的說明這個 mode ,Target Destination 有可能會收到重覆的資料但確保訊息都會收到, QOS_ONCE_AND_ONLY_ONCE mode 則確保會收到一次,此 mode 是架構在 JTA transaction 因此配置上要有相對應的修改(這一段節錄自 Configure JBoss Messaging Bridge) 。

準備執行環境

  • JDK (Java Development Kit) version 1.6+
  • JBoss Enterprise Application Platform 5.0
  • JBoss Messaging 1.4.6.GA
  • MySQL Server 5.1




JBoss Messaging Bridge for Topic sample explanation


Queue 的傳送模式為點對點,因此 Source/Target 的說明適合 Queue mode ,但 Topic 可以有多個 Subscriber ,也就是說 Source/Target 二地的 Topic 都可以有多個 Subscriber ,不過還是維持這種說明方式,方便設定上的說明。
  • Source Destination
  • 是用來接收 Publisher 送出來的 message
  • JBoss_Node-2 = appService = 127.0.0.1:1199
  • JNDIName:LocalTopic
  • SourceDestinationLookup:/LocalTopic
  • Target Destination
  • 是用來接收 Source Destination 轉送過來的 message,Subscriber 由 Target 接收 message
  • JBoss_Node-1 = default = 127.0.0.1:1099
  • JNDIName:RemoteTopic
  • TargetDestinationLookup:/RemoteTopic


JBoss Messaging Bridge for Topic Configuration

  1. XA recovery with JBoss Transactions 的設定,參考 Configure JBoss Messaging Bridge 關於 Persistent Messages 的說明,來修改 %JBOSS_HOME%\server\appService\deploy\messaging\messaging-service.xml%JBOSS_HOME%\server\appService\deploy\remote-jms-ds.xml 這二個設定檔, messaging-service.xml 設定 ServerPeerIDremote-jms-ds.xml 設定 XAConnectionFactory

  2. JBoss_Node-1 新增 Topic Service 設定,新增 %JBOSS_HOME%\server\default\deploy\RemoteTopic-service.xml ,TopicName 設定為 RemoteTopic ,設定 SecurityConfigsubscriber role 具有 read/write/create 的權限, read/write 是指對 Message ,create 是指對 Durable Subscriptions , non Durable Subscriptions 則不需要有 Create 的動作。
  3. <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <server>
      <mbean xmbean-dd="xmdesc/Topic-xmbean.xml" name="jboss.messaging.destination:service=Topic,name=RemoteTopic" code="org.jboss.jms.server.destination.TopicService">
        <attribute name="JNDIName">RemoteTopic</attribute>
        <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
        <depends>jboss.messaging:service=PostOffice</depends>
        <attribute name="Clustered">true</attribute>
        <attribute name="SecurityConfig">
          <security>
            <role name="subscriber" read="true" write="true" create="true"/>
          </security>
        </attribute>
      </mbean>
    </server>
    

    JBoss_Node-1DataSource 設定 Role-User MappingUsername/Password , 於 jbm_role Table 新增 subscriber/maryjbm_user Table 新增 mary/hotdog ,接下來會使用 User:mary ,而 RemoteTopicSecurityConfig 也配合設定 Role:subscriber ;而其他預設的 Role-User Mapping 可參考 %JBOSS_HOME%\server\default\deploy\messaging\mysql-persistence-service.xml

  4. JBoss_Node-2 新增 Topic Service 設定,新增 %JBOSS_HOME%\server\appService\deploy\LocalTopic-service.xml ,TopicName 設定為 LocalTopic ,設定 SecurityConfigdurpublisher role。
  5. <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <server>
      <mbean xmbean-dd="xmdesc/Topic-xmbean.xml" name="jboss.messaging.destination:service=Topic,name=LocalTopic" code="org.jboss.jms.server.destination.TopicService">
        <attribute name="JNDIName">LocalTopic</attribute>
        <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
        <depends>jboss.messaging:service=PostOffice</depends>
        <attribute name="Clustered">true</attribute>
        <attribute name="SecurityConfig">
          <security>
            <role name="durpublisher" read="true" write="true" create="true"/>
          </security>
        </attribute>
      </mbean>
    </server>
    

    比照前一點於 JBoss_Node-2 設定 Role-User MappingUsername/Password , 於 jbm_role Table 新增 durpublisher/bobjbm_user Table 新增 bob/cheesecake ,接下來會使用 User:bob ,而 LocalTopicSecurityConfig 也配合設定 Role:durpublisher

  6. JBoss_Node-2 新增 Topic Messaging Bridge Service 設定,新增 %JBOSS_HOME%\server\appService\deploy\TopicBridge-service.xml
  7. BridgeService Name:TopicBridge
  8. SourceDestinationLookup:/LocalTopic、TargetDestinationLookup:/RemoteTopic
  9. QualityOfServiceMode 設定為 2 也就是 QOS_ONCE_AND_ONLY_ONCE mode。
  10. SourceUsername/SourcePassword 設定第3點於 JBoss_Node-2 新增的 User: bob/cheesecakeTargetUsername/TargetPassword 設定第2點於 JBoss_Node-1 新增的 User: mary/hotdog
  11. 如不新增 User 也可直接設定 User:guest ,但無法使用 User:john 因 ClientID 已設定為 DurableSubscriberExample ,而這裡為了測試說明因此設定不同的 User。
  12. ClientID:clientNode2、SubName:localDurableSub1 ,SubName - Durable Subscription Name:長期訂閱者的名稱,以 ClientID + SubscriptionName 來識別訂閱者,可以用同一個 ClientID 搭配不同的 Subscription ,或是一對一的搭配。
  13. 只要 Subscriber 不給定一個名稱就是 Non Durable Subscriptions(非長期訂閱),而非長期訂閱要是訂閱者不在線上的期間會遺失訊息,因此如果要設定為 Persistent MessagesClientID/SubName 都須要設定,也就是說 BridgeService 是使用 localDurableSub1 在接收訊息,再將接收的訊息傳給 JBoss_Node-1 ,因此如不設定 SubscriptionJBoss_Node-1 Stop 期間會遺失訊息。
  14. <?xml version="1.0" encoding="UTF-8"?>
    <server>
      <mbean code="org.jboss.jms.server.bridge.BridgeService" name="jboss.messaging:service=Bridge,name=TopicBridge" xmbean-dd="xmdesc/Bridge-xmbean.xml">
        <depends optional-attribute-name="SourceProviderLoader">jboss.messaging:service=JMSProviderLoader,name=JMSProvider</depends>
        <depends optional-attribute-name="TargetProviderLoader">jboss.messaging:service=JMSProviderLoader,name=RemoteJMSProvider</depends>
        <attribute name="SourceDestinationLookup">/LocalTopic</attribute>
        <attribute name="TargetDestinationLookup">/RemoteTopic</attribute>
        <attribute name="QualityOfServiceMode">2</attribute>
        <attribute name="MaxBatchSize">5</attribute>
        <attribute name="MaxBatchTime">1</attribute>
        <attribute name="FailureRetryInterval">10000</attribute>
        <attribute name="MaxRetries">-1</attribute>
        <attribute name="AddMessageIDInHeader">false</attribute>
        <attribute name="SourceUsername">bob</attribute>
        <attribute name="SourcePassword">cheesecake</attribute>
        <attribute name="TargetUsername">mary</attribute>
        <attribute name="TargetPassword">hotdog</attribute>
        <attribute name="SubName">localDurableSub1</attribute>
        <attribute name="ClientID">clientNode2</attribute>
      </mbean>
    </server>
    



Testing Messaging Publish & Subscribe


接下來增加二支 Java Sample 測試 Publish & Subscribe Messaging。
  1. 新增 BridgeTopicSend.java 內容如下,測試 Publish 對 TOPIC:LocalTopic 發送訊息,如上面的說明 LocalTopic 是指 JBoss_Node-2(Source Destination) 的設定, createTopicConnection 時設定 User: bob/cheesecake(line-30),設定 ClientID:clientNode2(line-31)。
  2. import java.io.*;
    import java.util.*;
    import javax.transaction.*;
    import javax.naming.*;
    import javax.jms.*;
    import javax.rmi.PortableRemoteObject;
    
    public class BridgeTopicSend
    {
       public final static String JNDI_FACTORY="org.jnp.interfaces.NamingContextFactory";
    
       //*************** Connection Factory JNDI name *************************
       public final static String CONN_FACTORY="/ConnectionFactory";
    
       //*************** Topic JNDI name *************************
       public final static String TOPIC="/LocalTopic";
    
       protected TopicConnectionFactory tconFactory;
       protected TopicConnection tcon;
       protected TopicSession tsession;
       protected TopicPublisher tpublisher;
       protected Topic topic;
       protected TextMessage msg;
    
       public void init(Context ctx, String topicName) throws NamingException, JMSException
       {
          tconFactory = (TopicConnectionFactory) PortableRemoteObject.narrow(ctx.lookup(CONN_FACTORY),TopicConnectionFactory.class);
    
          //Subscription Durability
          tcon = tconFactory.createTopicConnection("bob", "cheesecake");
          tcon.setClientID("clientNode2");
    
          tsession = tcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
          topic = (Topic) PortableRemoteObject.narrow(ctx.lookup(topicName), Topic.class);
          tpublisher = tsession.createPublisher(topic);
          msg = tsession.createTextMessage();
          tcon.start();
       }
    
       public void send(String message) throws JMSException {
          msg.setText(message);
          tpublisher.publish(msg);
       }
    
       public void close() throws JMSException {
          tpublisher.close();
          tsession.close();
          tcon.close();
       }
    
       public static void main(String[] args) throws Exception {
          if (args.length != 1) {
             System.out.println("Usage: java Bridge TopicSend URL");
             return;
          }
          InitialContext ic = getInitialContext(args[0]);
          BridgeTopicSend ts = new BridgeTopicSend();
          ts.init(ic, TOPIC);
          readAndSend(ts);
          ts.close();
       }
    
       protected static void readAndSend(BridgeTopicSend ts)throws IOException, JMSException
       {
          BufferedReader msgStream = new BufferedReader (new InputStreamReader(System.in));
          String line=null;
          System.out.print("nt Bridge TopicSender(ClientID:clientNode2) Started ... Enter message (\"quit\" to quit): \n");
          do {
             System.out.print("Topic Sender Says > ");
             line = msgStream.readLine();
             if (line != null && line.trim().length() != 0) {
                ts.send(line);
             }
          } while (line != null && ! line.equalsIgnoreCase("quit"));
       }
    
       protected static InitialContext getInitialContext(String url)
          throws NamingException
       {
          Hashtable<String,String> env = new Hashtable<String,String>();
          env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
          env.put(Context.PROVIDER_URL, url);
          env.put("org.jnp.interfaces.NamingContextFactory", "true");
          return new InitialContext(env);
       }
    }
    

  3. 新增 BridgeTopicReceive.java 內容如下,測試 Subscriber 對 TOPIC:RemoteTopic 接收訊息,如上面的說明 RemoteTopic 是指 JBoss_Node-1(Target Destination) 的設定, createTopicConnection 時設定 User: mary/hotdog(line-51),設定 ClientID:clientNode1(line-52),設定 DurableSubscriber:remoteDurableSub1(line-56) ,於第一次執行 BridgeTopicReceive.javaremoteDurableSub1 會被建立,之後只要有訊息傳送給 TOPIC:RemoteTopic 都可透過 remoteDurableSub1 接受訊息。
  4. import java.io.*;
    import java.util.*;
    import javax.transaction.*;
    import javax.naming.*;
    import javax.jms.*;
    import javax.rmi.PortableRemoteObject;
    
    public class BridgeTopicReceive implements MessageListener {
    
       public final static String JNDI_FACTORY="org.jnp.interfaces.NamingContextFactory";
    
       //*************** Connection Factory JNDI name *************************
       public final static String CONN_FACTORY="/ConnectionFactory";
    
       //*************** Topic JNDI name *************************
       public final static String TOPIC="/RemoteTopic";
    
       private TopicConnectionFactory tconFactory;
       private TopicConnection tcon;
       private TopicSession tsession;
       private TopicSubscriber tsubscriber;
       private Topic topic;
       private boolean quit = false;
    
       public void onMessage(Message msg)
       {
          try {
             String msgText;
             if (msg instanceof TextMessage) {
                msgText = ((TextMessage)msg).getText();
             } else {
                msgText = msg.toString();
             }
             System.out.println("JMS Message Received: "+ msgText );
             if (msgText.equalsIgnoreCase("quit")) {
                synchronized(this) {
                   quit = true;
                   this.notifyAll();
                }
             }
          } catch (JMSException jmse) {
             System.err.println("An exception occurred: "+jmse.getMessage());
          }
       }
    
       public void init(Context ctx, String topicName)throws NamingException, JMSException
       {
          tconFactory = (TopicConnectionFactory)PortableRemoteObject.narrow(ctx.lookup(CONN_FACTORY), TopicConnectionFactory.class);
    
          //Subscription Durability
          tcon = tconFactory.createTopicConnection("mary", "hotdog");
          tcon.setClientID("clientNode1");
    
          tsession = tcon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
          topic = (Topic) PortableRemoteObject.narrow(ctx.lookup(topicName), Topic.class);
          tsubscriber = tsession.createDurableSubscriber(topic, "remoteDurableSub1");
          tsubscriber.setMessageListener(this);
          tcon.start();
       }
    
       public void close() throws JMSException {
          tsubscriber.close();
          tsession.close();
          tcon.close();
       }
    
       public static void main(String[] args) throws Exception {
          if (args.length != 1) {
             System.out.println("Usage: java examples.jms.topic.BridgeTopicReceive URL");
             return;
          }
          InitialContext ic = getInitialContext(args[0]);
          BridgeTopicReceive tr = new BridgeTopicReceive();
          tr.init(ic, TOPIC);
          System.out.println("JMS Ready To Receive Messages (ClientID:clientNode1,DurableSubscriber:remoteDurableSub1) (To quit, send a \"quit\" message).");
          synchronized(tr) {
             while (! tr.quit) {
                try {
                   tr.wait();
                } catch (InterruptedException ie) {}
             }
          }
          tr.close();
       }
    
       private static InitialContext getInitialContext(String url) throws NamingException
       {
          Hashtable<String,String> env = new Hashtable<String,String>();
          env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
          env.put(Context.PROVIDER_URL, url);
          env.put("org.jnp.interfaces.NamingContextFactory", "true");
          return new InitialContext(env);
       }
    }
    

  5. 如果先啟動 JBoss_Node-2 再啟動 JBoss_Node-1,則 Node-2 會有 Bridge Failed 的訊息,直到 Node-1 啟動會有 Bridge Succeeded in connecting to servers 的訊息表示 Bridge 連結成功。
  6. %JBOSS_HOME%\bin\run.bat
    %JBOSS_HOME%\appService\bin\runAppService.bat

  7. 新增 run.bat file,Compiler & Run - BridgeTopicSend.javaBridgeTopicReceive.java
  8. @echo off
    time /t
    
    set JBOSS_HOEM=C:\jboss-eap-5.0\jboss-as
    javac -cp $CLASSPATH;%JBOSS_HOME%\client\jbossall-client.jar;%JBOSS_HOME%\client\jboss-javaee.jar;%JBOSS_HOME%\common\lib\jnpserver.jar;%JBOSS_HOME%\client\jboss-logging-spi.jar;%JBOSS_HOME%\lib\jboss-javaee.jar;%JBOSS_HOME%\common\lib\servlet-api.jar;%JBOSS_HOME%\common\lib\log4j.jar BridgeTopicSend.java
    rem java -cp $CLASSPATH;.;%JBOSS_HOME%\client\jbossall-client.jar;%JBOSS_HOME%\client\jboss-javaee.jar;%JBOSS_HOME%\common\lib\jnpserver.jar;%JBOSS_HOME%\client\jboss-logging-spi.jar;%JBOSS_HOME%\lib\jboss-javaee.jar;%JBOSS_HOME%\common\lib\servlet-api.jar;%JBOSS_HOME%\common\lib\log4j.jar BridgeTopicSend jnp://127.0.0.1:1199
    
    
    rem javac -cp $CLASSPATH;%JBOSS_HOME%\client\jbossall-client.jar;%JBOSS_HOME%\client\jboss-javaee.jar;%JBOSS_HOME%\common\lib\jnpserver.jar;%JBOSS_HOME%\client\jboss-logging-spi.jar;%JBOSS_HOME%\lib\jboss-javaee.jar;%JBOSS_HOME%\common\lib\servlet-api.jar;%JBOSS_HOME%\common\lib\log4j.jar BridgeTopicReceive.java
    rem java -cp $CLASSPATH;.;%JBOSS_HOME%\client\jbossall-client.jar;%JBOSS_HOME%\client\jboss-javaee.jar;%JBOSS_HOME%\common\lib\jnpserver.jar;%JBOSS_HOME%\client\jboss-logging-spi.jar;%JBOSS_HOME%\lib\jboss-javaee.jar;%JBOSS_HOME%\common\lib\servlet-api.jar;%JBOSS_HOME%\common\lib\log4j.jar BridgeTopicReceive jnp://127.0.0.1:1099
    
    time /t

  9. 上述測試成功後,可再建立如下的摸擬測試。
    • Source Destination
    • JBoss_Node-2 = appService = 127.0.0.1:1199
    • 比照第2點建立二個 Receive java file ,模擬二個 Durable Subscriber
    • User 都使用 bob/cheesecake, ClientID 都使用 clientNode2
    • createDurableSubscriber 分別為:localDurableSub2/localDurableSub3 ,因 localDurableSub1 已經是 Bridge Service 在使用了
    • Target Destination
    • JBoss_Node-1 = default = 127.0.0.1:1099
    • 比照第2點建立一個 Receive java file ,加上 BridgeTopicReceive.java 也是模擬二個 Durable Subscriber
    • User 使用 mary/hotdog, ClientID 使用 clientNode1
    • createDurableSubscriber :remoteDurableSub2

  10. 前一點的模擬環境主要是針對 JBoss_Node-1JBoss_Node-2 多個訂閱者的測試,測試的重點則為(以下的測試皆可正常運作):
  11. 多個訂閱者是否都可收到訊息
  12. 訂閱者於不同時間點上線是否可收到 Publish 發佈的完整訊息
  13. JBoss_Node-1 無法運作期間 Publish 發佈的訊息,是否可於 JBoss_Node-1 啟動時,再透過 JBoss_Node-1 接收訊息

  14. 也可從 Admin Console 看到 Durable MessageDurable Subscriptions 的數量,如下圖是JBoss_Node-2Admin Console 畫面。



參考文章如下:
JBoss Messaging 1.4 User's Guide
How to create a Topic in JBoss AS 6 ?
Bridge cannot find session with QOS_ONCE_AND_ONLY_ONCE
A Pub-Sub With Durable Topic Example
How to create a durable subscriber for topic on Jboss 5?
Java Message Service 5.2 Tutorial

沒有留言:

張貼留言