2012年11月21日 星期三

Configure JBoss Messaging Bridge

JBoss Messaging Bridge 可以將 messages 從一台 JBoss Server 傳送到另一台 JBoss Server,也就是將 messages 傳送至甲地(Source Destination)而由乙地(Target Destination)接收 messages,Queue跟Topic 二種模式都支援。
以下的測試主要是參考 How to Configure JBoss Messaging Bridge in JBoss AS 5 ?

準備執行環境

因測試需要二台 JBoss Server 以新增 JBoss Nodes的方式來建立測試環境,新增 JBoss Nodes可以參考另一篇文章:JBoss EAP 5 Multi Apps Services
JBoss Messaging 1.4.6.GA 只是列出版本號碼,不需另外安裝,JBoss EAP 5.0 即包含 JBoss Messaging 。
  • JDK (Java Development Kit) version 1.6+
  • JBoss Enterprise Application Platform 5.0
  • JBoss Messaging 1.4.6.GA




JBoss Messaging Bridge sample explanation

  • Source Destination
  • 是用來接收 Sender 送出來的 message
  • JBoss_Node-2 = appService = 127.0.0.1:1199
  • JNDIName:TestQ
  • SourceDestinationLookup:/TestQ
  • Target Destination
  • 是用來接收 Source Destination 轉送過來的 message,Receiver由 Target 接收 message
  • JBoss_Node-1 = default = 127.0.0.1:1099
  • JNDIName:remoteTestQ
  • TargetDestinationLookup:/remoteLocation/remoteTestQ


JBoss Messaging Bridge Configuration

  1. JBoss_Node-1 新增 Queue Service 設定,新增 %JBOSS_HOME%\server\default\deploy\TestQueue-service.xml ,QueueName 設定為 remoteTestQ
  2. <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <server>
        <mbean xmbean-dd="xmdesc/Queue-xmbean.xml" name="jboss.messaging.destination:service=Queue,name=remoteTestQ" code="org.jboss.jms.server.destination.QueueService">
            <attribute name="DLQ"> jboss.messaging.destination:name=DLQ,service=Queue</attribute>
            <attribute name="ExpiryQueue">jboss.messaging.destination:name=ExpiryQueue,service=Queue</attribute>
            <attribute name="JNDIName">remoteTestQ</attribute>
            <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
            <depends>jboss.messaging:service=PostOffice</depends>
        </mbean>
    </server> 
    

  3. JBoss_Node-2 新增 Remote Messaging Server 設定 ,對 JBoss_Node-2 來說 Remote Messaging Server 就是 JBoss_Node-1,新增 %JBOSS_HOME%\server\appService\deploy\remote-jms-ds.xml ,而 Server Name 設定為 127.0.0.1:1099
  4. <?xml version="1.0" encoding="UTF-8"?>
    <connection-factories>
      <mbean code="org.jboss.jms.jndi.JMSProviderLoader" name="jboss.messaging:service=JMSProviderLoader,name=RemoteJMSProvider">
        <attribute name="ProviderName">RemoteJMSProvider</attribute>
        <attribute name="ProviderAdapterClass">org.jboss.jms.jndi.JNDIProviderAdapter</attribute>
        <attribute name="FactoryRef">remoteLocation/ConnectionFactory</attribute>
        <attribute name="QueueFactoryRef">remoteLocation/ConnectionFactory</attribute>
        <attribute name="TopicFactoryRef">remoteLocation/ConnectionFactory</attribute>
        <depends>jboss.jndi:service=ExternalContext,jndiName=remoteLocation</depends>
      </mbean>
    
      <mbean code="org.jboss.naming.ExternalContext" name="jboss.jndi:service=ExternalContext,jndiName=remoteLocation">
        <attribute name="JndiName">remoteLocation</attribute>
        <attribute name="Properties">
          java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
          java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
          java.naming.provider.url=jnp://127.0.0.1:1099
        </attribute>
      </mbean>
    </connection-factories>
    

  5. JBoss_Node-2 新增 Queue Service 設定,新增 %JBOSS_HOME%\server\appService\deploy\TestQ-service.xml ,QueueName 設定為 TestQ
  6. <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
    <server>
        <mbean xmbean-dd="xmdesc/Queue-xmbean.xml" name="jboss.messaging.destination:service=Queue,name=TestQ" code="org.jboss.jms.server.destination.QueueService">
            <attribute name="DLQ"> jboss.messaging.destination:name=DLQ,service=Queue</attribute>
            <attribute name="ExpiryQueue">jboss.messaging.destination:name=ExpiryQueue,service=Queue</attribute>
            <attribute name="JNDIName">TestQ</attribute>
            <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
            <depends>jboss.messaging:service=PostOffice</depends>
        </mbean>
    </server>
    

  7. JBoss_Node-2 新增 Messaging Bridge Service 設定,新增 %JBOSS_HOME%\server\appService\deploy\TestBridge-service.xml ,SourceDestinationLookup:/TestQ、TargetDestinationLookup:/remoteLocation/remoteTestQ,Target 的 /remoteLocation/remoteTestQ 其中的/remoteLocation 是 Remote Messaging Server 的設定。
  8. <?xml version="1.0" encoding="UTF-8"?>
    <server>
       <mbean code="org.jboss.jms.server.bridge.BridgeService" name="jboss.messaging:service=Bridge,name=TestBridge" xmbean-dd="xmdesc/Bridge-xmbean.xml">
       <depends optional-attribute-name="SourceProviderLoader">jboss.messaging:service=JMSProviderLoader,name=JMSProvider</depends>
       <depends optional-attribute-name="TargetProviderLoader">jboss.messaging:service=JMSProviderLoader,name=RemoteJMSProvider</depends>
          <attribute name="SourceDestinationLookup">/TestQ</attribute>
          <attribute name="TargetDestinationLookup">/remoteLocation/remoteTestQ</attribute>
          <attribute name="QualityOfServiceMode">1</attribute>
          <attribute name="MaxBatchSize">5</attribute>
          <attribute name="MaxBatchTime">1</attribute>
          <attribute name="FailureRetryInterval">5000</attribute>
          <attribute name="MaxRetries">-1</attribute>
          <attribute name="AddMessageIDInHeader">false</attribute>
       </mbean>
    </server>
    



Testing Messaging Send & Receive


接下來增加二支 Java Sample 測試 Send & Receive Messaging。
  1. 新增 QueueSend.java 內容如下,測試 Sender 對 QUEUE:TestQ 發送訊息,如上面的說明 TestQ 是指 JBoss_Node-2(Source Destination) 的設定。
  2. import java.io.*;
    import java.util.Hashtable;
    import javax.jms.JMSException;
    import javax.jms.Queue;
    import javax.jms.QueueConnection;
    import javax.jms.QueueConnectionFactory;
    import javax.jms.QueueSender;
    import javax.jms.QueueSession;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;
    
    public class QueueSend
    {
       public final static String JNDI_FACTORY="org.jnp.interfaces.NamingContextFactory";
       //*************** Connection Factory JNDI name *************************
       public final static String JMS_FACTORY="/ConnectionFactory";
       //*************** Queue JNDI name *************************
       public final static String QUEUE="/TestQ";
    
       private QueueConnectionFactory qconFactory;
       private QueueConnection qcon;
       private QueueSession qsession;
       private QueueSender qsender;
       private Queue queue;
       private TextMessage msg;
    
       public void init(Context ctx, String queueName)throws NamingException, JMSException
       {
          qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
          qcon = qconFactory.createQueueConnection();
          qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
          queue = (Queue) ctx.lookup(queueName);
          qsender = qsession.createSender(queue);
          msg = qsession.createTextMessage();
          qcon.start();
       }
    
       public void send(String message) throws JMSException {
          msg.setText(message);
          qsender.send(msg);
       }
    
       public void close() throws JMSException {
          qsender.close();
          qsession.close();
          qcon.close();
       }
    
       public static void main(String[] args) throws Exception {
          if (args.length != 1) {
             System.out.println("Usage: java QueueSend URL");
             return;
          }
          InitialContext ic = getInitialContext(args[0]);
          QueueSend qs = new QueueSend();
          qs.init(ic, QUEUE);
          readAndSend(qs);
          qs.close();
       }
    
       private static void readAndSend(QueueSend qs) throws IOException, JMSException
       {
          String line="Test Message Body with counter = ";
          BufferedReader br=new BufferedReader(new InputStreamReader(System.in));
          boolean readFlag=true;
          System.out.println("ntStart Sending Messages (Enter QUIT to Stop):n");
          while(readFlag)
          {
             System.out.print("<msg_sender> ");
             String msg=br.readLine();
             if(msg.equals("QUIT") || msg.equals("quit"))
             {
                qs.send(msg);
                System.exit(0);
             }
             qs.send(msg);
             System.out.println();
          }
          br.close();
       }
    
       private static InitialContext getInitialContext(String url) throws NamingException
       {
          Hashtable env = new Hashtable();
          env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
          env.put(Context.PROVIDER_URL, url);
          return new InitialContext(env);
       }
    }
    

  3. 新增 QueueReceive.java 內容如下,測試 Receiver 對 QUEUE:remoteTestQ 接收訊息,如上面的說明 remoteTestQ 是指 JBoss_Node-1(Target Destination) 的設定。
  4. import java.util.Hashtable;
    import javax.jms.*;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;
    
    public class QueueReceive implements MessageListener
    {
       public final static String JNDI_FACTORY="org.jnp.interfaces.NamingContextFactory";
    
       //*************** Connection Factory JNDI name *************************
       public final static String JMS_FACTORY="/ConnectionFactory";
    
       //*************** Connection Factory JNDI name *************************
       public final static String QUEUE="/remoteTestQ";
    
       private QueueConnectionFactory qconFactory;
       private QueueConnection qcon;
       private QueueSession qsession;
       private QueueReceiver qreceiver;
       private Queue queue;
       private boolean quit = false;
    
       public void onMessage(Message msg)
       {
          try {
             String msgText;
             if (msg instanceof TextMessage)
             {
                msgText = ((TextMessage)msg).getText();
             }
             else
             {
                msgText = msg.toString();
             }
             System.out.println("\n\t<Msg_Receiver> "+ msgText );
    
             if (msgText.equalsIgnoreCase("quit"))
             {
                synchronized(this)
             {
                quit = true;
                this.notifyAll(); // Notify main thread to quit
             }
             }
          }
          catch (JMSException jmse)
          {
             jmse.printStackTrace();
          }
       }
    
       public void init(Context ctx, String queueName) throws NamingException, JMSException
       {
          qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
          qcon = qconFactory.createQueueConnection();
          qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
          queue = (Queue) ctx.lookup(queueName);
          qreceiver = qsession.createReceiver(queue);
          qreceiver.setMessageListener(this);
          qcon.start();
       }
       
       public void close()throws JMSException
       {
          qreceiver.close();
          qsession.close();
          qcon.close();
       }
       
       public static void main(String[] args) throws Exception
       {
          if (args.length != 1)
          {
             System.out.println("Usage: java QueueReceive URL");
             return;
          }
          InitialContext ic = getInitialContext(args[0]);
          QueueReceive qr = new QueueReceive();
          qr.init(ic, QUEUE);
          System.out.println("JMS Ready To Receive Messages (To quit, send a \"quit\" message from QueueSender.class).");
          // Wait until a "quit" message has been received.
          synchronized(qr)
          {
             while (! qr.quit)
             {
                try
                {
                   qr.wait();
                }
                catch (InterruptedException ie)
                {}
             }
          }
          qr.close();
       }
       
       private static InitialContext getInitialContext(String url) throws NamingException
       {
          Hashtable env = new Hashtable();
          env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
          env.put(Context.PROVIDER_URL, url);
          return new InitialContext(env);
       }
    }
    

  5. 先啟動 JBoss_Node-1 再啟動 JBoss_Node-2,因 Node-2 的 Remote Messaging Server 設定有連接到 Node-1 所以啟動有先後順序。
  6. %JBOSS_HOME%\bin\run.bat
    %JBOSS_HOME%\appService\bin\runAppService.bat

  7. 新增 run.bat file,Compiler & Run - QueueSend.javaQueueReceive.java
  8. @echo off
    time /t
    
    set JBOSS_HOEM=C:\jboss-eap-5.0\jboss-as
    javac -cp $CLASSPATH;%JBOSS_HOME%\client\jbossall-client.jar;%JBOSS_HOME%\client\jboss-javaee.jar;%JBOSS_HOME%\common\lib\jnpserver.jar;%JBOSS_HOME%\client\jboss-logging-spi.jar;%JBOSS_HOME%\lib\jboss-javaee.jar;%JBOSS_HOME%\common\lib\servlet-api.jar;%JBOSS_HOME%\common\lib\log4j.jar QueueSend.java
    rem java -cp $CLASSPATH;.;%JBOSS_HOME%\client\jbossall-client.jar;%JBOSS_HOME%\client\jboss-javaee.jar;%JBOSS_HOME%\common\lib\jnpserver.jar;%JBOSS_HOME%\client\jboss-logging-spi.jar;%JBOSS_HOME%\lib\jboss-javaee.jar;%JBOSS_HOME%\common\lib\servlet-api.jar;%JBOSS_HOME%\common\lib\log4j.jar QueueSend jnp://127.0.0.1:1199
    
    
    rem javac -cp $CLASSPATH;%JBOSS_HOME%\client\jbossall-client.jar;%JBOSS_HOME%\client\jboss-javaee.jar;%JBOSS_HOME%\common\lib\jnpserver.jar;%JBOSS_HOME%\client\jboss-logging-spi.jar;%JBOSS_HOME%\lib\jboss-javaee.jar;%JBOSS_HOME%\common\lib\servlet-api.jar;%JBOSS_HOME%\common\lib\log4j.jar QueueReceive.java
    rem java -cp $CLASSPATH;.;%JBOSS_HOME%\client\jbossall-client.jar;%JBOSS_HOME%\client\jboss-javaee.jar;%JBOSS_HOME%\common\lib\jnpserver.jar;%JBOSS_HOME%\client\jboss-logging-spi.jar;%JBOSS_HOME%\lib\jboss-javaee.jar;%JBOSS_HOME%\common\lib\servlet-api.jar;%JBOSS_HOME%\common\lib\log4j.jar QueueReceive jnp://127.0.0.1:1099
    
    time /t

  9. 執行結果如下,此 Sample 有一點小問題,Sender 要結束輸入 quit 會連 Receiver 一起結束,因 Receiver 也是判斷接收的字串來決定是否結束,但並不影响測試的目的。
  10. 輸入訊息傳送到 JBoss_Node-2

    JBoss_Node-1 接收訊息

  11. 另外也可以對 JBoss_Node-2QUEUE:TestQ 接收訊息,但因 QUEUE 的行為只能被一個 Receiver 接收,因此如果被 QUEUE:TestQ 接收訊息則 QUEUE:remoteTestQ 無法再接收訊息,依測試的結果二個 Receiver 都開啟同時 Listener ,會輪流收到訊息,這是測試10次左右的觀察結果。



Persistent Messages mode Configuration(2013.01.24)


這部份是後續新增的,Messaging Bridge 可透過設定 Quality of Service (QoS) Level 來達到 Persistent MessagesQoS 共有三個設定:QOS_AT_MOST_ONCE、QOS_DUPLICATES_OK、QOS_ONCE_AND_ONLY_ONCE(設定值依序為0~2,詳見官網說明),文章開頭的 Sample 是使用 QOS_DUPLICATES_OK mode 依官網的說明這個 mode ,Target Destination 有可能會收到重覆的資料但確保訊息都會收到, QOS_ONCE_AND_ONLY_ONCE mode 則確保會收到一次,此 mode 是架構在 JTA transaction 因此配置上有相對應的修改,而以下的設定都是在 JBoss_Node-2
  1. 修改 %JBOSS_HOME%\server\appService\deploy\messaging\messaging-service.xml (以下為部份的節錄), 修改 ServerPeerID 參數如 xml 上的說明,每個 node 都必須是唯一的,此 sample 有二個 JBoss Node 而 JBoss_Node-1 不做修改,因此將 ${jboss.messaging.ServerPeerID:0} 改為 ${jboss.messaging.ServerPeerID:1},修改後存檔。
  2. <?xml version="1.0" encoding="UTF-8"?>
    
    <!--
         The JBoss Messaging service deployment descriptor.
    
         $Id: messaging-service.xml 85945 2009-03-16 19:45:12Z dimitris@jboss.org $
     -->
    
    <server>
    
       <!-- ServerPeer MBean configuration
            ============================== -->
    
        <mbean code="org.jboss.jms.server.ServerPeer"
            name="jboss.messaging:service=ServerPeer"
            xmbean-dd="xmdesc/ServerPeer-xmbean.xml">
    
            <!-- The unique id of the server peer - in a cluster each node MUST have a unique value - must be an integer -->
    
            <attribute name="ServerPeerID">${jboss.messaging.ServerPeerID:1}</attribute>
    
    ....
    
        </mbean>
    
    </server>
    

  3. 修改 %JBOSS_HOME%\server\appService\deploy\remote-jms-ds.xml ,而 lien 6~8 都改設定為 XAConnectionFactory ,於官網的說明是採用 QOS_ONCE_AND_ONLY_ONCE mode 必須設定 XAConnectionFactory ,才會有 XA recovery with JBoss Transactions 的功能,而依實際測試如果設定不改為 XAConnectionFactory ,則佈署及執行都不會有問題,但 JBoss_Node-1 如果 restart , JBoss_Node-2 的 Bridge 無法重新連線 JBoss_Node-1 ,必須 JBoss_Node-2 也 restart 才能重新建立連線; JBoss_Node-1 Remote 連線改為 Properties 設定(line 9 ~ 13)。
  4. <?xml version="1.0" encoding="UTF-8"?>
    <connection-factories>
      <mbean code="org.jboss.jms.jndi.JMSProviderLoader" name="jboss.messaging:service=JMSProviderLoader,name=RemoteJMSProvider">
        <attribute name="ProviderName">RemoteJMSProvider</attribute>
        <attribute name="ProviderAdapterClass">org.jboss.jms.jndi.JNDIProviderAdapter</attribute>
        <attribute name="FactoryRef">/XAConnectionFactory</attribute>
        <attribute name="QueueFactoryRef">/XAConnectionFactory</attribute>
        <attribute name="TopicFactoryRef">/XAConnectionFactory</attribute>
        <attribute name="Properties">
          java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
          java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
          java.naming.provider.url=jnp://127.0.0.1:1099
        </attribute>
      </mbean>
    </connection-factories>
    

  5. 修改 %JBOSS_HOME%\server\appService\deploy\TestBridge-service.xml ,將 QualityOfServiceMode 設定為 2 也就是 QOS_ONCE_AND_ONLY_ONCE mode ,而前一點有提到關於 XAConnectionFactory 的設定, RemoteJMSProvider 已於前一點設定完成,至於 JMSProvider 則可參考 %JBOSS_HOME%\server\appService\deploy\messaging\jms-ds.xml 預設也已設定為 XAConnectionFactory (附註一點: java: 開頭表示為local的設定);因 remote-jms-ds.xmlremoteLocation JndiName 改設定了,所以 TargetDestinationLookup 也改為 /remoteTestQ
  6. <?xml version="1.0" encoding="UTF-8"?>
    <server>
       <mbean code="org.jboss.jms.server.bridge.BridgeService" name="jboss.messaging:service=Bridge,name=TestBridge" xmbean-dd="xmdesc/Bridge-xmbean.xml">
       <depends optional-attribute-name="SourceProviderLoader">jboss.messaging:service=JMSProviderLoader,name=JMSProvider</depends>
       <depends optional-attribute-name="TargetProviderLoader">jboss.messaging:service=JMSProviderLoader,name=RemoteJMSProvider</depends>
          <attribute name="SourceDestinationLookup">/TestQ</attribute>
          <attribute name="TargetDestinationLookup">/remoteTestQ</attribute>
          <attribute name="QualityOfServiceMode">2</attribute>
          <attribute name="MaxBatchSize">5</attribute>
          <attribute name="MaxBatchTime">1</attribute>
          <attribute name="FailureRetryInterval">5000</attribute>
          <attribute name="MaxRetries">-1</attribute>
          <attribute name="AddMessageIDInHeader">false</attribute>
       </mbean>
    </server>
    

  7. 以上三點修改後即可依照之前的步驟進行測試,測試的重點則是 JBoss_Node-2JBoss_Node-1 的訊息不會有遺漏的情況,以下畫面為 JBoss_Node-1 restart 時在 JBoss_Node-2 的 log,可看到 JBoss_Node-1 已啟動後 Bridge reconnecting 的訊息 (retry 時間的設定則參考 TestBridge-service.xml - FailureRetryInterval 參數);至於傳送到 JBoss_Node-2 的訊息如何確保送達則是 Sender 該負責的議題不在此討論。



參考文章如下:
How to Configure JBoss Messaging Bridge in JBoss AS 5 ?
JBoss Messaging 2.0 Quickstart Guide
Spring Gossip: 訊息(Message)觀念
Jboss JMS Bridge (Jboss to Jboss)

沒有留言:

張貼留言