| Sign In/My Account | View Cart |
Designing Messaging Applications with Temporary Queues
Pages: 1, 2
The session should be closed as soon as processing is completed so that TemporaryQueues will be deleted on the server side. While the JMS session doesn't support concurrency, this can be implemented with connection objects. A session is a single-threaded context to produce and consume the temporary messages. According to the JMS specification:
A session supports transactions and it is difficult to implement transactions that are multithreaded; a session should not be used concurrently by multiple threads.
Even though you are not using a transacted session, you are still required to coordinate individual sessions to implement concurrent processing. The only method in the session object method that can be concurrently called is close(). Threads are good candidates for blocking calls since they could be executed concurrently.
There is no need to close the consumers (QueueReceiver) of a closed session. As soon as the connection that created the temporary queue is closed, all the constituents of the closed connection and session are closed. Relying on garbage collection to eventually reclaim these resources may not be timely enough, especially in the case of JMS resources that are created outside of JVM.
The following code excerpt shows how to create Temporary Destinations. We use common interfaces like Session as opposed to specific domain constructs like QueueSession or TopicSession. Sun encourages using common interfaces without programming specific to a single message domain so that the JMS application is domain-independent.
<font color=
"blue">/** Get the Initial Context Factory */</font>
Hashtable env = new Hashtable();
String conFactoryClass = "VendorSpecificCFC";
String url = "MyServer:port";
env.put(Context.INITIAL_CONTEXT_FACTORY, conFactoryClass);
env.put(Context.PROVIDER_URL, url);
env.put(Context.SECURITY_PRINCIPAL, "userid" );
env.put(Context.SECURITY_CREDENTIALS, "password");
Context = new InitialContext(env);
<font color=
"blue">/**Access the resources by looking up the JNDI
context*/
/**QueueConnectionFactory is a Factory for QueueConnection
Object*/</font>
javax.jms.ConnectionFactory qcf =
(QueueConnectionFactory)context.lookup("queueConFactoryName");
javax.jms.Connection qConnection =
(QueueConnection)qcf.createConnection("userid", "password");
javax.jms.Queue myQueue =
(javax.jms.Queue)context.lookup("MyQueue");
javax.jms.Topic myTopic =
(javax.jms.Topic)context.lookup("MyTopic");
<font color="blue">/**TopicConnectionFactory is a Factory for
TopicConnection Object*/</font>
javax.jms.ConnectionFactory tcf =
(TopicConnectionFactory)context.lookup("topicConFactoryName");
javax.jms.TopicConnection tConnection =
(TopicConnection)tcf.createConnection("userid", "password");
<font color="blue">/**Create a Session using Connection
Object*/</font>
javax.jms.Session qSession =
qConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
javax.jms.Session tSession =
tConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
<font color=
"blue">/**Create the Temporary Destinations using the Session
Object*/</font>
javax.jms.Queue replyQueue = qSession.createTemporaryQueue();
javax.jms.Topic replyTopic = tSession.createTemporaryTopic();
Temporary destinations are a useful tool in message-driven architectures. They have the distinction of being lightweight, and they allow applications to efficiently scale because you can easily expand the number available at runtime. Static destinations must be created in advance and, although useful in other ways, do not offer you this same flexibility. Because of this, temporary destinations are preferable to static destinations despite the extra work required to set up the communication between the sender and receiver.
Thanks to Ron Gates and the ONJava team for proofreading this article.
Thakur Thribhuvan is a senior software engineer at Southwest Airlines with 11 years of experience in design and development. He has been working with Java since its inception, and with JMS since 1999.
Return to ONJava.com.
Showing messages 1 through 17 of 17.
/** We start off be initiating a JNDI look up for Queue Connection Factory*/
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, VendorSpecificContextFactory);
env.put(Context.PROVIDER_URL, JMS_URL);
env.put(Context.SECURITY_PRINCIPAL, USERNAME );
env.put(Context.SECURITY_CREDENTIALS, PASSWORD);
Context context = new InitialContext(env);
System.out.println("Go the Initial Context");
/** Access the resources by looking up the JNDI tree */
/** Get the Connection Factory reference*/
QueueConnectionFactory qcf = (QueueConnectionFactory)context.lookup(QCFNAME);
System.out.println("Go the QCF");
/** Get the connection to JMS Server */
javax.jms.QueueConnection qConnection = (javax.jms.QueueConnection)qcf.createQueueConnection(USERNAME, PASSWORD);
/** Get the reference to the static destination that MDB is listening on */
javax.jms.Queue staticQueue = (javax.jms.Queue)context.lookup(STATIC_QUEUE_NAME);
/** Create a Session using Connection Object */
javax.jms.Session qSession = (QueueSession)qConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
/** Create the Temporary Destinations using Session Object */
javax.jms.Queue replyTempQueue = qSession.createTemporaryQueue();
System.out.println("Temporary Queue Name:" + replyTempQueue);
/** Create a Message - in our example a text message */
TextMessage message= qSession.createTextMessage();
message.setText("sampletext"); // in most cases it is an xml
/**Optional - set message properties - if any for example user, department*/
message.setStringProperty(USER, USER);
message.setStringProperty(DEPT, DEPT);
/** Set the temporary queue name as a message header - JMSReplyTo*/
message.setJMSReplyTo(replyTempQueue); //mandatory in the context of temporary destinations
/** Optional - vendor specific - set the message expiration*/
message.setJMSExpiration(30000); //message will expire after 30 seconds
/**Assuming MDB is active and accepting requests, depending upon the vendor,
* you could accept the message in a second or couple of seconds - depends upon
* the complexity of the workflow you are dealing with on the Facade side*/
/** Create Receiver - We are now ready to listen on the temporary Queue*/
QueueReceiver receiver= ((QueueSession)qSession).createReceiver(replyTempQueue);
/** Create worker thread that makes the blocking call*/
RequestHelper worker = new RequestHelper(qSession, qConnection,replyTempQueue,receiver );
worker.start();
/** At this point a new thread is spawned and waiting for the reply*/
/** Create a sender to the static queue using the session*/
QueueSender sender= ((QueueSession)qSession).createSender(staticQueue);
/** send the message*/
sender.send(message);
System.out.println("Message Send to Static Queue:" + staticQueue.getQueueName());
public class RequestHelper extends Thread
{
private Connection conn;
private MessageConsumer receiver;
private Session qSession;
private Queue tempQueue; /** Used only for logging purposes*/
private String threadId; /** Used only for logging purposes*/
public RequestHelper(Session session, Connection conn, Queue tmpQue, QueueReceiver queueReceiver) throws JMSException
{
this.receiver = queueReceiver;
this.conn = conn;
this.qSession = session;
this.threadId = getName();
this.tempQueue = tmpQue;
System.out.println("Temporay QueueName:" + threadId + ":" + tmpQue.getQueueName());
}
public void run()
{
try
{
System.out.println(threadId + ":Calling Connection.start");
conn.start();
System.out.println(threadId + ":Connection.Start Called");
System.out.println(threadId + ":Delivery of Messages Started");
String receivedText = null;
System.out.println(threadId + ":Waiting for Messages on Receiver");
Message messageM = receiver.receive();
System.out.println(threadId + ":Processing of Response started, Unpacking TextMessage");
TextMessage textMessage = (TextMessage)messageM;
receivedText = textMessage.getText();
System.out.println (threadId + ":Successfully Completed Processing of Response" + threadId + ":" + receivedText );
receiver.close();
qSession.close();
}catch ( JMSException exp)
{
exp.printStackTrace();
}
}
}
The session should be closed as soon as processing is completed so that TemporaryQueues will be deleted on the server side
My question is about if it's a good idea to use temporary queues to send messages within the server and a rich client, where there are about 5000 clients (5000 temporary queues). In this scenario the server is sending messages to a especific user due to an event in the server side.
In this scenario we couldn't use 1 static filtered queue because the messages couldn't be readed by other clients that has access to it (security).
I don't know if queues is the best approach in this scenario.
regards
Gastón