[译]JMS 点对点消息传递的实际应用
点对点系统使用队列进行消息传递。客户端将消息发送到特定队列,特定订阅者监听或接收来自该队列的消息。在 JMS 点对点消息传递系统中,队列用于处理单个发送者和单个消费者。确保您已经阅读了有关JMS 消息模型以及如何在 JMS 中发送和接收消息的先前教程。
使用队列进行 PTP 消息传送
您现在知道点对点消息传递完全是利用队列。JMS 提供javax.jms.Queue
表示队列对象的功能。我将利用 JMS 2.0 API 来javax.jms.JMSProducer
发送javax.jms.JMSConsumer
和接收消息。
我已经向您展示了多个使用 JMS 队列的示例。以下代码表示在 JMS 2.0 API 中使用 JMS 队列的步骤,只是为了再次刷新一下概念。
package lab00.simple.helloworld;
import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class TestHelloWorld {
public static void main(String[] args) {
ConnectionFactory connectionFactory = null;
Queue queue = null;
try {
InitialContext initialContext = new InitialContext();
//Step-1 Create ConnectionFactory
connectionFactory
= (ConnectionFactory) initialContext.lookup("jms/__defaultConnectionFactory");
//Step-2 Get the Destination
queue = (Queue) initialContext.lookup("jms/PTPQueue");
} catch (NamingException e) {
e.printStackTrace();
}
//Step-3 Create J MSContext
try (JMSContext jmsContext = connectionFactory.createContext()) {
//Step-4a Create a Text Message and send
TextMessage textMessage = jmsContext.createTextMessage("Message using JMS 2.0");
JMSProducer jmsProducer = jmsContext.createProducer().send(queue, textMessage);
//Step-4b Receive the message
TextMessage message = (TextMessage) jmsContext.createConsumer(queue).receive();
System.out.println(message.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
注意:在点对点消息系统中,一旦从队列中读取(接收)消息,该消息就会被删除。换句话说,它只能被读取一次,之后就会从队列中删除。
JMS 中的异步消息监听器
JMS 允许您为 分配一个异步消息侦听器JMSConsumer
。异步侦听器可用于队列和主题。您可以使用 lambda 函数,如下所示。
@Test
public void testAsyncConsumer() throws InterruptedException {
try(JMSContext jmsContext = connectionFactory.createContext()){
JMSConsumer consumer = jmsContext.createConsumer(queue);
consumer.setMessageListener(message -> {
try {
System.out.println(message.getBody(String.class));
} catch (JMSException e) {
e.printStackTrace();
}
});
JMSProducer producer = jmsContext.createProducer().send(queue, "Message 1");
producer.send(queue, "Message 2");
producer.send(queue, "Message 3");
//Close the Consumer
consumer.close();
}
}
输出
Message 1
Message 2
Message 3
注意:始终建议尽可能使用异步消息侦听器。在本教程中,我为 bravity 使用了同步消息处理,但在项目中使用异步方式是正确的方式。
临时队列
是javax.jms.TemporaryQueue
为连接持续时间创建的唯一临时队列对象。它只能由创建它的连接使用。TemporaryQueue
如果没有为其分配接收者,JMS 会确保删除它。我们将在下面讨论 TemporaryQueue 在典型的请求-响应场景中的用途,其中接收者使用另一条消息响应发送者。
下面的代码演示了如何创建 TemporaryQueue 并将其链接到消息。
TemporaryQueue tempReplyQ = jmsContext.createTemporaryQueue();
//Send the message
Message message = jmsContext.createTextMessage("Temp Queue demo");
message.setJMSReplyTo(tempReplyQ);
public interface TemporaryQueue extends Queue {
void
delete() throws JMSException;
}
PTP 消息传递中的 QueueBrowser
A由或javax.jms.QueueBrowser
创建。您可以使用浏览队列中的消息而不删除它们。JMSContext
、Session
、QueueBrowser
@Test
public void testQueueBrowser() throws JMSException {
try (JMSContext jmsContext = connectionFactory.createContext()) {
JMSProducer producer = jmsContext.createProducer();
TextMessage message1 = jmsContext.createTextMessage("Start your day with a glass of Water!");
TextMessage message2 = jmsContext.createTextMessage("Remember to do 10 mins stretching");
producer.send(queue, message1);
producer.send(queue, message2);
QueueBrowser qBrowser = jmsContext.createBrowser(queue);
Enumeration msgEnum = qBrowser.getEnumeration();
while (msgEnum.hasMoreElements()) {
TextMessage browsedMsg = (TextMessage) msgEnum.nextElement();
System.out.println("Browsed message: " + browsedMsg.getText());
}
JMSConsumer consumer = jmsContext.createConsumer(queue);
for (int i = 0; i < 2; i++) {
System.out.println("Received message: " + consumer.receiveBody(String.class));
}
}
}
输出
Browsed message: Start your day with a glass of Water!
Browsed message: Remember to do 10 mins stretching
Received message: Start your day with a glass of Water!
Received message: Remember to do 10 mins stretching
队列的可靠性
队列通常由管理员创建并长期存在。即使没有消费者接收消息,队列也会保留消息。因此,即使不采取任何特殊措施,它也能使使用非常可靠。
JMS 中的消息请求回复
存在消息接收者想要回复发送者的用例。在点对点消息系统中,有两种方法可以实现请求-回复场景。
- 通过使用TemporaryQueue。
- 或者使用另一个队列作为回复队列。
1. 使用TemporaryQueue进行消息请求-回复
以下代码显示了如何使用,TemporaryQueue
消息接收者会在此队列上回复。临时队列是会话范围的(jmsContext 范围),会话关闭时会将其删除。
//SessionFactory & Queue creation skipped for bravity
try(JMSContext jmsContext = connectionFactory.createContext()) {
JMSProducer producer = jmsContext.createProducer();
//Use temporary Queue to send and receive messages.
TemporaryQueue replyQueue = jmsContext.createTemporaryQueue();
TextMessage message = jmsContext.createTextMessage("Sender message - Hi there!");
message.setJMSReplyTo(replyQueue);
producer.send(queue, message);
System.out.println(message.getJMSMessageID());
//Message received
JMSConsumer consumer = jmsContext.createConsumer(queue);
TextMessage messageReceived = (TextMessage) consumer.receive();
System.out.println(messageReceived.getText());
//Reply
JMSProducer replyProducer = jmsContext.createProducer();
TextMessage replyMessage = jmsContext.createTextMessage("Reply message - Hi, all well here!");
replyMessage.setJMSCorrelationID(messageReceived.getJMSMessageID());
replyProducer.send(messageReceived.getJMSReplyTo(), replyMessage);
//Receive and process reply
JMSConsumer replyConsumer = jmsContext.createConsumer(replyQueue);
System.out.println( replyConsumer.receiveBody(String.class));
TextMessage replyReceived = (TextMessage) replyConsumer.receive();
System.out.println(replyReceived.getJMSCorrelationID());
replyQueue.delete();
}
输出
Sender message - Hi there!
Reply message - Hi, all well here!
如第 8 行所示,需要在消息头上设置临时队列。我们已经在JMS 消息模型中讨论过消息头。接收方读取消息并了解要回复的队列,如第 21 行所示。
**注意:**少数 JMS 提供商可能需要特殊权限才能创建 temporaryQueue。只需与您的 JMS 管理员沟通以允许该权限即可。
2. 使用另一个队列作为回复队列
就像创建临时队列一样,您可以拥有多个队列。一个用于消息,另一个用于回复原始消息。在下面的代码中,queue
和replyQueue
的用途完全相同。
package lab02.message.requestresponse;
import labxx.common.settings.CommonSettings;
import javax.jms.*;
public class MessageRequestResponse {
public static void main(String[] args) throws JMSException, InterruptedException {
ConnectionFactory connectionFactory = CommonSettings.getConnectionFactory();
Queue queue = CommonSettings.getDefaultQueue();
Queue replyQueue = CommonSettings.getDefaultReplyQueue();
Thread messageproducer = new Thread() {
public void run() {
try (JMSContext jmsContext = connectionFactory.createContext()) {
JMSProducer producer = jmsContext.createProducer();
//Send the message
Message message = jmsContext.createTextMessage("Order placed successfully");
message.setJMSReplyTo(replyQueue);
producer.send(queue, message);
sleep(2000);
JMSConsumer replyConsumer = jmsContext.createConsumer(replyQueue);
TextMessage replyMessage = (TextMessage) replyConsumer.receive();
System.out.println("Received reply: " + replyMessage.getText());
} catch (JMSException | InterruptedException ex) {
ex.printStackTrace();
}
}
};
Thread messageConsumer = new Thread() {
public void run() {
try (JMSContext jmsContext = connectionFactory.createContext()) {
//Receive message
Thread.sleep(1000);
JMSConsumer consumer = jmsContext.createConsumer(queue);
Message msg = consumer.receive();
System.out.println("Received message: " + msg.getBody(String.class));
//Reply message
jmsContext.createProducer().send(msg.getJMSReplyTo(), "Order will be dispatched soon!");
} catch (JMSException | InterruptedException e) {
e.printStackTrace();
}
}
};
messageproducer.start();
messageConsumer.start();
}
}
输出
Received message: Order placed successfully
Received reply: Order will be dispatched soon!
注意:当回复队列中有多个回复时,很难找出消息和回复之间的联系。也就是说,您如何知道哪个 replyMessage 属于哪个消息。解决方案是使用JMSCorrelationID。
在 JMS 点对点请求响应消息传递中使用 JMSCorrelationID
下面的示例演示了 的使用JMSCorrelationID
。首先将消息发送到队列,接收方侦听该消息并对其进行处理。收到的消息包含JMSreplyTo
队列名称等详细信息。消息接收方在 replyQueue 上发送回复并JMSCorrelationID
从 receivedMessage id 中设置,如 所示line 40
。
package lab02.message.requestresponse;
import labxx.common.settings.CommonSettings;
import javax.jms.*;
// MessageId and CorrelationId test
public class MessageIdCorrelationId {
public static void main(String[] args) throws JMSException, InterruptedException {
ConnectionFactory connectionFactory = CommonSettings.getConnectionFactory();
Queue queue = CommonSettings.getDefaultQueue();
Queue replyQueue = CommonSettings.getDefaultReplyQueue();
try (JMSContext jmsContext = connectionFactory.createContext()) {
//Message listener on replyQueue
JMSConsumer replyConsumer = jmsContext.createConsumer(replyQueue);
replyConsumer.setMessageListener(msg -> {
try {
System.out.println("Reply message: " + msg.getBody(String.class));
System.out.println("Reply MessageID: " + msg.getJMSMessageID());
System.out.println("Reply CorrelationID: " + msg.getJMSCorrelationID());
} catch (JMSException e) {
e.printStackTrace();
}
});
//Message1
JMSProducer producer = jmsContext.createProducer();
TextMessage message = jmsContext.createTextMessage("Order placed successfully");
message.setJMSReplyTo(replyQueue);
producer.send(queue, message);
System.out.println("Message1 " + message.getJMSMessageID());
//Receive Message
JMSConsumer consumer = jmsContext.createConsumer(queue);
TextMessage receivedMsg = (TextMessage) consumer.receive();
System.out.println("Message received: " + receivedMsg.getText());
//Reply Message
TextMessage replyMessage = jmsContext.createTextMessage("Order Acknowledged");
replyMessage.setJMSCorrelationID(receivedMsg.getJMSMessageID());
jmsContext.createProducer().send(receivedMsg.getJMSReplyTo(), replyMessage);
//Remember to close Otherwise will throw Exception
replyConsumer.close();
}
}
}
Message1 ID:12-192.168.0.197(bc:c9:94:43:39:c4)-61701-1576468524639
Message received: Order placed successfully
Reply message: Order Acknowledged
Reply MessageID: ID:18-192.168.0.197(bc:c9:94:43:39:c4)-61701-1576468524648
Reply CorrelationID: ID:12-192.168.0.197(bc:c9:94:43:39:c4)-61701-1576468524639
这些都是使用队列的 JMS 点对点消息传递的一部分。您还学习了如何在 JMS 中使用异步消息处理。请在下面的评论中分享您对本文的反馈。