Thursday, 14 August 2014

JBoss EAP JMS Duplicate JMS Message Detection

Objective

To demonstrate how HornetQ duplicate message detection feature can be used to let server automatically detect and discard the duplicate messages.

Enviornment

Eclipse, JDK 6, JBoss EAP 6.0


Development


Eclipse JBoss EAP Server Runtime Setup for Standalone-full-ha profile

=> Eclipse Menu:  Window => Preferences => Server => Runtime Environments





MDB Project Setup


=> Use Eclipse Menu { File => New => EJB Project  } to create a EJB Project named “HelloMDB”

=> Then use Menu { File => New => Message Driven Bean } to create HelloMDB message driven bean


=> The source code for HelloMDB:

package com.mahesh;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * Message-Driven Bean implementation class for: HelloMDB
 */
@MessageDriven(activationConfig = {
                                                @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
                                                @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/helloMDB") })
public class HelloMDB implements MessageListener {

                        /**
                         * @see MessageListener#onMessage(Message)
                         */
                        public void onMessage(Message message) {
                                                try {
                                                                        System.out.println("Inside HelloMDB");
                                                                        System.out.println("Message: " + ((TextMessage) message).getText()
                                                                                                                        + " with ID: " + message.getJMSMessageID());
                                                } catch (JMSException e) {
                                                                        e.printStackTrace();
                                                }
                        }
}

Servlet Project Setup


=> Use Eclipse Menu { File => New => Dynamic Web Project  } to create a Web Project named “TestServlet”

=> Then use Menu { File => New => Servlet } to create TestServlet Servlet


=> The source code for TestServlet: (Remember you will have to jboss hornetqu-core jar in classpath
jboss-eap-6.0\modules\org\hornetq\main\hornetq-core-2.2.16.Final-redhat-1.jar)

package com.mahesh;

import java.io.IOException;
import java.io.PrintWriter;

import javax.annotation.Resource;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
 * Servlet implementation class TestServlet
 */
@WebServlet("/TestServlet")
public class TestServlet extends HttpServlet {

                @Resource(mappedName = "java:/ConnectionFactory")
                private ConnectionFactory qconFactory;

                @Resource(mappedName = "java:/queue/helloMDB")
                private Queue queue;

                /**
                 * @see HttpServlet#HttpServlet()
                 */
                public TestServlet() {
                                super();
                }

                private static final long serialVersionUID = -8314035702649252239L;

                @Override
                protected void doGet(HttpServletRequest req, HttpServletResponse resp)
                                                throws ServletException, IOException {
                                resp.setContentType("text/html");
                                PrintWriter out = resp.getWriter();
                                try {
                                                out.write("<p>Sending messages to <em>helloMDB</em></p>");
                                                sendMessage("A Sample Message", "A Unique ID");
                                                out.write("<p>Message sent: A Sample Message</p>");
                                                out.write("</br>");
                                                out.write("<p><i>Go to your JBoss Application Server console or Server log to see the result of messages processing</i></p>");
                                } catch (Exception e) {
                                                e.printStackTrace();
                                                out.write("<h2>A problem occurred during the delivery of this message</h2>");
                                                out.write("</br>");
                                                out.write("<p><i>Go your the JBoss Application Server console or Server log to see the error stack trace</i></p>");
                                }
                }

                protected void doPost(HttpServletRequest req, HttpServletResponse resp)
                                                throws ServletException, IOException {
                                doGet(req, resp);
                }

                private void sendMessage(final String message, final String uniqueMessageID)
                                                throws Exception {
                                Connection qcon = null;
                                try {
                                                qcon = qconFactory.createConnection();
                                                Session qsession = qcon.createSession(false,
                                                                                Session.AUTO_ACKNOWLEDGE);
                                                MessageProducer qsender = qsession.createProducer(queue);
                                                TextMessage txtMsg = qsession.createTextMessage(message);
                        // org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID gives
                        // dependency error so using the hardcoded value _HQ_DUPL_ID
                        txtMsg.setStringProperty("_HQ_DUPL_ID", uniqueMessageID);
                                                qsender.send(txtMsg);
                                                System.out.println("Message sent: " + txtMsg);
                                                qcon.close();
                                } catch (Exception e) {
                                                e.printStackTrace();
                                } finally {
                                                if (qcon != null) {
                                                                qcon.close();
                                                }
                                }
                }

}

Testing


=> { Right Click on Project HelloMDB  => Run As => Run On Server} and click { Finish }


=> { Right Click on Project TestServlet  => Run As => Run On Server} and click { Finish }

=> Open Web Browser and hit URL http://localhost:8080/TestServlet/TestServlet (hit the URL two times so that two messages could be sent)


=> The MDB processing can be checked in EAP server log (jboss-eap-6.0\standalone\log\server.log)

18:13:36,572 INFO  [org.jboss.as.server] (ServerService Thread Pool -- 40) JBAS018559: Deployed "HelloMDB.jar"
……..
……..
18:13:50,943 INFO  [org.jboss.as.server] (DeploymentScanner-threads - 1) JBAS018559: Deployed "TestServlet.war"
………
………
18:14:04,610 INFO  [stdout] (http-localhost/127.0.0.1:8080-1) Message sent: HornetQMessage[ID:82407695-22e7-11e4-8ea2-279e943ef5a0]:PERSISTENT
18:14:04,660 INFO  [stdout] (Thread-0 (HornetQ-client-global-threads-2040637942)) Inside HelloMDB
18:14:04,660 INFO  [stdout] (Thread-0 (HornetQ-client-global-threads-2040637942)) Message: A Sample Message with ID: ID:82407695-22e7-11e4-8ea2-279e943ef5a0
……..
……..
18:15:32,059 WARN  [org.hornetq.core.postoffice.impl.PostOfficeImpl] (Thread-3 (HornetQ-remoting-threads-HornetQServerImpl::serverUUID=10f71dee-22db-11e4-ac94-a7609448202b-1647507571-5601379)) Duplicate message detected - message will not be routed. Message information:ServerMessage[messageID=2147483672,priority=4, bodySize=1500,expiration=0, durable=true, address=jms.queue.queue/helloMDB,properties=TypedProperties[{_HQ_DUPL_ID=A Unique ID}]]@873503274
18:15:32,060 INFO  [stdout] (http-localhost/127.0.0.1:8080-1) Message sent: HornetQMessage[ID:b66157ab-22e7-11e4-8ea2-279e943ef5a0]:PERSISTENT

=> You can notice first message is successfully received (Inside HelloMDB ) by MDB while the second message could not reach to MDB and discarded 9 (Duplicate message detected) by Server in the queue itself.

Thing to Remember


=> The duplicate cache id is maintained by the server and max cache size is specified by parameter id-cache-size and the default value is 2000 elements.

=> Whether cache is to be maintained in disk or not is specified by parameter persist-id-cache and the default value is true that means on disk.

=> To change this parameters use jboss-cli option and navigate to /subsystem=messaging/hornetq-server=default/




=> The source code can be downloaded from Git location:  https://github.com/mchopker/myprojects/tree/master/HornetQDuplicateMessage

Thank You!

2 comments: