[译]JMS 点对点消息传递的实际应用
点对点系统使用队列进行消息传递。客户端将消息发送到特定队列,特定订阅者监听或接收来自该队列的消息。在 JMS 点对点消息传递系统中,队列用于处理单个发送者和单个消费者。确保您已经阅读了有关JMS 消息模型以及如何在 JMS 中发送和接收消息的先前教程。
使用队列进行 PTP 消息传送
您现在知道点对点消息传递完全是利用队列。JMS 提供javax.jms.Queue表示队列对象的功能。我将利用 JMS 2.0 API 来javax.jms.JMSProducer发送javax.jms.JMSConsumer和接收消息。
我已经向您展示了多个使用 JMS 队列的示例。以下代码表示在 JMS 2.0 API 中使用 JMS 队列的步骤,只是为了再次刷新一下概念。
package lab00.simple.helloworld;
import javax.jms.*;import javax.naming.InitialContext;import javax.naming.NamingException;
public class TestHelloWorld { public static void main(String[] args) {
ConnectionFactory connectionFactory = null; Queue queue = null;
try { InitialContext initialContext = new InitialContext();
//Step-1 Create ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup("jms/__defaultConnectionFactory");
//Step-2 Get the Destination queue = (Queue) initialContext.lookup("jms/PTPQueue"); } catch (NamingException e) { e.printStackTrace(); }
//Step-3 Create J MSContext try (JMSContext jmsContext = connectionFactory.createContext()) {
//Step-4a Create a Text Message and send TextMessage textMessage = jmsContext.createTextMessage("Message using JMS 2.0"); JMSProducer jmsProducer = jmsContext.createProducer().send(queue, textMessage);
//Step-4b Receive the message TextMessage message = (TextMessage) jmsContext.createConsumer(queue).receive(); System.out.println(message.getText()); } catch (JMSException e) { e.printStackTrace(); } }}注意:在点对点消息系统中,一旦从队列中读取(接收)消息,该消息就会被删除。换句话说,它只能被读取一次,之后就会从队列中删除。
JMS 中的异步消息监听器
JMS 允许您为 分配一个异步消息侦听器JMSConsumer。异步侦听器可用于队列和主题。您可以使用 lambda 函数,如下所示。
@Testpublic void testAsyncConsumer() throws InterruptedException { try(JMSContext jmsContext = connectionFactory.createContext()){ JMSConsumer consumer = jmsContext.createConsumer(queue); consumer.setMessageListener(message -> { try { System.out.println(message.getBody(String.class)); } catch (JMSException e) { e.printStackTrace(); } });
JMSProducer producer = jmsContext.createProducer().send(queue, "Message 1"); producer.send(queue, "Message 2"); producer.send(queue, "Message 3");
//Close the Consumer consumer.close(); }}输出
Message 1Message 2Message 3注意:始终建议尽可能使用异步消息侦听器。在本教程中,我为 bravity 使用了同步消息处理,但在项目中使用异步方式是正确的方式。
临时队列
是javax.jms.TemporaryQueue为连接持续时间创建的唯一临时队列对象。它只能由创建它的连接使用。TemporaryQueue如果没有为其分配接收者,JMS 会确保删除它。我们将在下面讨论 TemporaryQueue 在典型的请求-响应场景中的用途,其中接收者使用另一条消息响应发送者。
下面的代码演示了如何创建 TemporaryQueue 并将其链接到消息。
TemporaryQueue tempReplyQ = jmsContext.createTemporaryQueue();
//Send the messageMessage message = jmsContext.createTextMessage("Temp Queue demo");message.setJMSReplyTo(tempReplyQ);public interface TemporaryQueue extends Queue { void delete() throws JMSException;}PTP 消息传递中的 QueueBrowser
A由或javax.jms.QueueBrowser创建。您可以使用浏览队列中的消息而不删除它们。JMSContext、Session、QueueBrowser
@Test public void testQueueBrowser() throws JMSException { try (JMSContext jmsContext = connectionFactory.createContext()) { JMSProducer producer = jmsContext.createProducer(); TextMessage message1 = jmsContext.createTextMessage("Start your day with a glass of Water!"); TextMessage message2 = jmsContext.createTextMessage("Remember to do 10 mins stretching");
producer.send(queue, message1); producer.send(queue, message2);
QueueBrowser qBrowser = jmsContext.createBrowser(queue); Enumeration msgEnum = qBrowser.getEnumeration(); while (msgEnum.hasMoreElements()) { TextMessage browsedMsg = (TextMessage) msgEnum.nextElement(); System.out.println("Browsed message: " + browsedMsg.getText()); }
JMSConsumer consumer = jmsContext.createConsumer(queue); for (int i = 0; i < 2; i++) { System.out.println("Received message: " + consumer.receiveBody(String.class)); } } }输出
Browsed message: Start your day with a glass of Water!Browsed message: Remember to do 10 mins stretchingReceived message: Start your day with a glass of Water!Received message: Remember to do 10 mins stretching队列的可靠性
队列通常由管理员创建并长期存在。即使没有消费者接收消息,队列也会保留消息。因此,即使不采取任何特殊措施,它也能使使用非常可靠。
JMS 中的消息请求回复
存在消息接收者想要回复发送者的用例。在点对点消息系统中,有两种方法可以实现请求-回复场景。
- 通过使用TemporaryQueue。
- 或者使用另一个队列作为回复队列。
1. 使用TemporaryQueue进行消息请求-回复
以下代码显示了如何使用,TemporaryQueue消息接收者会在此队列上回复。临时队列是会话范围的(jmsContext 范围),会话关闭时会将其删除。
//SessionFactory & Queue creation skipped for bravitytry(JMSContext jmsContext = connectionFactory.createContext()) { JMSProducer producer = jmsContext.createProducer();
//Use temporary Queue to send and receive messages. TemporaryQueue replyQueue = jmsContext.createTemporaryQueue(); TextMessage message = jmsContext.createTextMessage("Sender message - Hi there!"); message.setJMSReplyTo(replyQueue); producer.send(queue, message); System.out.println(message.getJMSMessageID());
//Message received JMSConsumer consumer = jmsContext.createConsumer(queue); TextMessage messageReceived = (TextMessage) consumer.receive(); System.out.println(messageReceived.getText());
//Reply JMSProducer replyProducer = jmsContext.createProducer(); TextMessage replyMessage = jmsContext.createTextMessage("Reply message - Hi, all well here!"); replyMessage.setJMSCorrelationID(messageReceived.getJMSMessageID()); replyProducer.send(messageReceived.getJMSReplyTo(), replyMessage);
//Receive and process reply JMSConsumer replyConsumer = jmsContext.createConsumer(replyQueue); System.out.println( replyConsumer.receiveBody(String.class)); TextMessage replyReceived = (TextMessage) replyConsumer.receive(); System.out.println(replyReceived.getJMSCorrelationID()); replyQueue.delete(); }输出
Sender message - Hi there!Reply message - Hi, all well here!如第 8 行所示,需要在消息头上设置临时队列。我们已经在JMS 消息模型中讨论过消息头。接收方读取消息并了解要回复的队列,如第 21 行所示。
**注意:**少数 JMS 提供商可能需要特殊权限才能创建 temporaryQueue。只需与您的 JMS 管理员沟通以允许该权限即可。
2. 使用另一个队列作为回复队列
就像创建临时队列一样,您可以拥有多个队列。一个用于消息,另一个用于回复原始消息。在下面的代码中,queue和replyQueue的用途完全相同。
package lab02.message.requestresponse;
import labxx.common.settings.CommonSettings;import javax.jms.*;
public class MessageRequestResponse { public static void main(String[] args) throws JMSException, InterruptedException {
ConnectionFactory connectionFactory = CommonSettings.getConnectionFactory(); Queue queue = CommonSettings.getDefaultQueue(); Queue replyQueue = CommonSettings.getDefaultReplyQueue();
Thread messageproducer = new Thread() { public void run() { try (JMSContext jmsContext = connectionFactory.createContext()) { JMSProducer producer = jmsContext.createProducer(); //Send the message Message message = jmsContext.createTextMessage("Order placed successfully"); message.setJMSReplyTo(replyQueue); producer.send(queue, message); sleep(2000); JMSConsumer replyConsumer = jmsContext.createConsumer(replyQueue); TextMessage replyMessage = (TextMessage) replyConsumer.receive(); System.out.println("Received reply: " + replyMessage.getText()); } catch (JMSException | InterruptedException ex) { ex.printStackTrace(); } } };
Thread messageConsumer = new Thread() { public void run() { try (JMSContext jmsContext = connectionFactory.createContext()) { //Receive message Thread.sleep(1000); JMSConsumer consumer = jmsContext.createConsumer(queue); Message msg = consumer.receive(); System.out.println("Received message: " + msg.getBody(String.class)); //Reply message jmsContext.createProducer().send(msg.getJMSReplyTo(), "Order will be dispatched soon!"); } catch (JMSException | InterruptedException e) { e.printStackTrace(); } } };
messageproducer.start(); messageConsumer.start(); }}输出
Received message: Order placed successfullyReceived reply: Order will be dispatched soon!注意:当回复队列中有多个回复时,很难找出消息和回复之间的联系。也就是说,您如何知道哪个 replyMessage 属于哪个消息。解决方案是使用JMSCorrelationID。
在 JMS 点对点请求响应消息传递中使用 JMSCorrelationID
下面的示例演示了 的使用JMSCorrelationID。首先将消息发送到队列,接收方侦听该消息并对其进行处理。收到的消息包含JMSreplyTo队列名称等详细信息。消息接收方在 replyQueue 上发送回复并JMSCorrelationID从 receivedMessage id 中设置,如 所示line 40。
package lab02.message.requestresponse;
import labxx.common.settings.CommonSettings;import javax.jms.*;
// MessageId and CorrelationId testpublic class MessageIdCorrelationId { public static void main(String[] args) throws JMSException, InterruptedException { ConnectionFactory connectionFactory = CommonSettings.getConnectionFactory(); Queue queue = CommonSettings.getDefaultQueue(); Queue replyQueue = CommonSettings.getDefaultReplyQueue();
try (JMSContext jmsContext = connectionFactory.createContext()) {
//Message listener on replyQueue JMSConsumer replyConsumer = jmsContext.createConsumer(replyQueue); replyConsumer.setMessageListener(msg -> { try { System.out.println("Reply message: " + msg.getBody(String.class)); System.out.println("Reply MessageID: " + msg.getJMSMessageID()); System.out.println("Reply CorrelationID: " + msg.getJMSCorrelationID()); } catch (JMSException e) { e.printStackTrace(); } });
//Message1 JMSProducer producer = jmsContext.createProducer(); TextMessage message = jmsContext.createTextMessage("Order placed successfully"); message.setJMSReplyTo(replyQueue); producer.send(queue, message); System.out.println("Message1 " + message.getJMSMessageID());
//Receive Message JMSConsumer consumer = jmsContext.createConsumer(queue); TextMessage receivedMsg = (TextMessage) consumer.receive(); System.out.println("Message received: " + receivedMsg.getText());
//Reply Message TextMessage replyMessage = jmsContext.createTextMessage("Order Acknowledged"); replyMessage.setJMSCorrelationID(receivedMsg.getJMSMessageID()); jmsContext.createProducer().send(receivedMsg.getJMSReplyTo(), replyMessage);
/Remember to close Otherwise will throw Exception replyConsumer.close(); } }}Message1 ID:12-192.168.0.197(bc:c9:94:43:39:c4)-61701-1576468524639Message received: Order placed successfullyReply message: Order AcknowledgedReply MessageID: ID:18-192.168.0.197(bc:c9:94:43:39:c4)-61701-1576468524648Reply CorrelationID: ID:12-192.168.0.197(bc:c9:94:43:39:c4)-61701-1576468524639这些都是使用队列的 JMS 点对点消息传递的一部分。您还学习了如何在 JMS 中使用异步消息处理。请在下面的评论中分享您对本文的反馈。