Objective
To demonstrate how HornetQ duplicate message detection feature can be used to let server automatically detect and discard the duplicate messages.
Enviornment
Eclipse, JDK 6, JBoss EAP 6.0
Development
Eclipse JBoss EAP Server Runtime Setup for Standalone-full-ha profile
=> Eclipse Menu: Window => Preferences => Server => Runtime Environments
MDB Project Setup
=> Then use Menu { File => New => Message Driven Bean } to create HelloMDB message driven bean
=> The source code for HelloMDB:
package com.mahesh;
import
javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* Message-Driven Bean implementation class
for: HelloMDB
*/
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName
= "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName
= "destination", propertyValue = "queue/helloMDB") })
public class HelloMDB implements
MessageListener {
/**
* @see MessageListener#onMessage(Message)
*/
public
void onMessage(Message message) {
try
{
System.out.println("Inside
HelloMDB");
System.out.println("Message:
" + ((TextMessage) message).getText()
+
" with ID: " + message.getJMSMessageID());
}
catch (JMSException e) {
e.printStackTrace();
}
}
}
|
Servlet Project Setup
=> Use Eclipse Menu { File => New => Dynamic Web Project } to create a Web Project named “TestServlet”
=> Then use Menu { File => New => Servlet } to create TestServlet Servlet
=> The source code for TestServlet: (Remember you will have to jboss hornetqu-core jar in classpath
jboss-eap-6.0\modules\org\hornetq\main\hornetq-core-2.2.16.Final-redhat-1.jar)
package com.mahesh;
import java.io.IOException;
import java.io.PrintWriter;
import javax.annotation.Resource;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.servlet.ServletException;
import
javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import
javax.servlet.http.HttpServletRequest;
import
javax.servlet.http.HttpServletResponse;
/**
* Servlet implementation class TestServlet
*/
@WebServlet("/TestServlet")
public class TestServlet extends
HttpServlet {
@Resource(mappedName
= "java:/ConnectionFactory")
private
ConnectionFactory qconFactory;
@Resource(mappedName
= "java:/queue/helloMDB")
private
Queue queue;
/**
* @see HttpServlet#HttpServlet()
*/
public
TestServlet() {
super();
}
private
static final long serialVersionUID = -8314035702649252239L;
@Override
protected
void doGet(HttpServletRequest req, HttpServletResponse resp)
throws
ServletException, IOException {
resp.setContentType("text/html");
PrintWriter
out = resp.getWriter();
try
{
out.write("<p>Sending
messages to <em>helloMDB</em></p>");
sendMessage("A
Sample Message", "A Unique ID");
out.write("<p>Message
sent: A Sample Message</p>");
out.write("</br>");
out.write("<p><i>Go
to your JBoss Application Server console or Server log to see the result of
messages processing</i></p>");
}
catch (Exception e) {
e.printStackTrace();
out.write("<h2>A
problem occurred during the delivery of this message</h2>");
out.write("</br>");
out.write("<p><i>Go
your the JBoss Application Server console or Server log to see the error
stack trace</i></p>");
}
}
protected
void doPost(HttpServletRequest req, HttpServletResponse resp)
throws
ServletException, IOException {
doGet(req,
resp);
}
private
void sendMessage(final String message, final String uniqueMessageID)
throws
Exception {
Connection
qcon = null;
try
{
qcon
= qconFactory.createConnection();
Session
qsession = qcon.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer
qsender = qsession.createProducer(queue);
TextMessage
txtMsg = qsession.createTextMessage(message);
//
org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID gives
// dependency
error so using the hardcoded value _HQ_DUPL_ID
txtMsg.setStringProperty("_HQ_DUPL_ID",
uniqueMessageID);
qsender.send(txtMsg);
System.out.println("Message
sent: " + txtMsg);
qcon.close();
}
catch (Exception e) {
e.printStackTrace();
}
finally {
if
(qcon != null) {
qcon.close();
}
}
}
}
|
Testing
=> { Right Click on Project TestServlet => Run As => Run On Server} and click { Finish }
=> Open Web Browser and hit URL http://localhost:8080/TestServlet/TestServlet (hit the URL two times so that two messages could be sent)
=> The MDB processing can be checked in EAP server log (jboss-eap-6.0\standalone\log\server.log)
18:13:36,572 INFO [org.jboss.as.server] (ServerService Thread
Pool -- 40) JBAS018559: Deployed "HelloMDB.jar"
……..
……..
18:13:50,943 INFO [org.jboss.as.server]
(DeploymentScanner-threads - 1) JBAS018559: Deployed
"TestServlet.war"
………
………
18:14:04,610 INFO [stdout] (http-localhost/127.0.0.1:8080-1)
Message sent:
HornetQMessage[ID:82407695-22e7-11e4-8ea2-279e943ef5a0]:PERSISTENT
18:14:04,660 INFO [stdout] (Thread-0
(HornetQ-client-global-threads-2040637942)) Inside
HelloMDB
18:14:04,660 INFO [stdout] (Thread-0
(HornetQ-client-global-threads-2040637942)) Message: A Sample Message with
ID: ID:82407695-22e7-11e4-8ea2-279e943ef5a0
……..
……..
18:15:32,059 WARN
[org.hornetq.core.postoffice.impl.PostOfficeImpl] (Thread-3
(HornetQ-remoting-threads-HornetQServerImpl::serverUUID=10f71dee-22db-11e4-ac94-a7609448202b-1647507571-5601379))
Duplicate message detected - message will not
be routed. Message information:ServerMessage[messageID=2147483672,priority=4,
bodySize=1500,expiration=0, durable=true,
address=jms.queue.queue/helloMDB,properties=TypedProperties[{_HQ_DUPL_ID=A
Unique ID}]]@873503274
18:15:32,060 INFO [stdout] (http-localhost/127.0.0.1:8080-1)
Message sent: HornetQMessage[ID:b66157ab-22e7-11e4-8ea2-279e943ef5a0]:PERSISTENT
|
=> You can notice first message is successfully received (Inside HelloMDB ) by MDB while the second message could not reach to MDB and discarded 9 (Duplicate message detected) by Server in the queue itself.
Thing to Remember
id-cache-size
and the default
value is 2000 elements.=> Whether cache is to be maintained in disk or not is specified by parameter
persist-id-cache
and the default value is true that means on
disk.
=> To change this parameters use jboss-cli option
and navigate to
/subsystem=messaging/hornetq-server=default/
NdybiWfrem_ri Adrian Viher https://wakelet.com/wake/vQuNyWmipJaxkeQadMOOu
ReplyDeletealmeseari
NsubcaWfrin_ki Michael Richards https://www.brimstoneshop.com/profile/mychellamychella/profile
ReplyDeletecredcardgambrag