点对点系统使用队列进行消息传递。客户端将消息发送到特定队列,特定订阅者监听或接收来自该队列的消息。在 JMS 点对点消息传递系统中,队列用于处理单个发送者和单个消费者。确保您已经阅读了有关JMS 消息模型以及如何在 JMS 中发送和接收消息的先前教程。
使用队列进行 PTP 消息传送#
您现在知道点对点消息传递完全是利用队列。JMS 提供javax.jms.Queue
表示队列对象的功能。我将利用 JMS 2.0 API 来javax.jms.JMSProducer
发送javax.jms.JMSConsumer
和接收消息。
我已经向您展示了多个使用 JMS 队列的示例。以下代码表示在 JMS 2.0 API 中使用 JMS 队列的步骤,只是为了再次刷新一下概念。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
| package lab00.simple.helloworld;
import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class TestHelloWorld {
public static void main(String[] args) {
ConnectionFactory connectionFactory = null;
Queue queue = null;
try {
InitialContext initialContext = new InitialContext();
//Step-1 Create ConnectionFactory
connectionFactory
= (ConnectionFactory) initialContext.lookup("jms/__defaultConnectionFactory");
//Step-2 Get the Destination
queue = (Queue) initialContext.lookup("jms/PTPQueue");
} catch (NamingException e) {
e.printStackTrace();
}
//Step-3 Create J MSContext
try (JMSContext jmsContext = connectionFactory.createContext()) {
//Step-4a Create a Text Message and send
TextMessage textMessage = jmsContext.createTextMessage("Message using JMS 2.0");
JMSProducer jmsProducer = jmsContext.createProducer().send(queue, textMessage);
//Step-4b Receive the message
TextMessage message = (TextMessage) jmsContext.createConsumer(queue).receive();
System.out.println(message.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
|
注意:在点对点消息系统中,一旦从队列中读取(接收)消息,该消息就会被删除。换句话说,它只能被读取一次,之后就会从队列中删除。
JMS 中的异步消息监听器#
JMS 允许您为 分配一个异步消息侦听器JMSConsumer
。异步侦听器可用于队列和主题。您可以使用 lambda 函数,如下所示。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| @Test
public void testAsyncConsumer() throws InterruptedException {
try(JMSContext jmsContext = connectionFactory.createContext()){
JMSConsumer consumer = jmsContext.createConsumer(queue);
consumer.setMessageListener(message -> {
try {
System.out.println(message.getBody(String.class));
} catch (JMSException e) {
e.printStackTrace();
}
});
JMSProducer producer = jmsContext.createProducer().send(queue, "Message 1");
producer.send(queue, "Message 2");
producer.send(queue, "Message 3");
//Close the Consumer
consumer.close();
}
}
|
输出
1
2
3
| Message 1
Message 2
Message 3
|
注意:始终建议尽可能使用异步消息侦听器。在本教程中,我为 bravity 使用了同步消息处理,但在项目中使用异步方式是正确的方式。
临时队列#
是javax.jms.TemporaryQueue
为连接持续时间创建的唯一临时队列对象。它只能由创建它的连接使用。TemporaryQueue
如果没有为其分配接收者,JMS 会确保删除它。我们将在下面讨论 TemporaryQueue 在典型的请求-响应场景中的用途,其中接收者使用另一条消息响应发送者。
下面的代码演示了如何创建 TemporaryQueue 并将其链接到消息。
1
2
3
4
5
| TemporaryQueue tempReplyQ = jmsContext.createTemporaryQueue();
//Send the message
Message message = jmsContext.createTextMessage("Temp Queue demo");
message.setJMSReplyTo(tempReplyQ);
|
1
2
3
4
| public interface TemporaryQueue extends Queue {
void
delete() throws JMSException;
}
|
PTP 消息传递中的 QueueBrowser#
A由或javax.jms.QueueBrowser
创建。您可以使用浏览队列中的消息而不删除它们。JMSContext
、Session
、QueueBrowser
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| @Test
public void testQueueBrowser() throws JMSException {
try (JMSContext jmsContext = connectionFactory.createContext()) {
JMSProducer producer = jmsContext.createProducer();
TextMessage message1 = jmsContext.createTextMessage("Start your day with a glass of Water!");
TextMessage message2 = jmsContext.createTextMessage("Remember to do 10 mins stretching");
producer.send(queue, message1);
producer.send(queue, message2);
QueueBrowser qBrowser = jmsContext.createBrowser(queue);
Enumeration msgEnum = qBrowser.getEnumeration();
while (msgEnum.hasMoreElements()) {
TextMessage browsedMsg = (TextMessage) msgEnum.nextElement();
System.out.println("Browsed message: " + browsedMsg.getText());
}
JMSConsumer consumer = jmsContext.createConsumer(queue);
for (int i = 0; i < 2; i++) {
System.out.println("Received message: " + consumer.receiveBody(String.class));
}
}
}
|
输出
1
2
3
4
| Browsed message: Start your day with a glass of Water!
Browsed message: Remember to do 10 mins stretching
Received message: Start your day with a glass of Water!
Received message: Remember to do 10 mins stretching
|
队列的可靠性#
队列通常由管理员创建并长期存在。即使没有消费者接收消息,队列也会保留消息。因此,即使不采取任何特殊措施,它也能使使用非常可靠。
JMS 中的消息请求回复#
存在消息接收者想要回复发送者的用例。在点对点消息系统中,有两种方法可以实现请求-回复场景。
- 通过使用TemporaryQueue。
- 或者使用另一个队列作为回复队列。
1. 使用TemporaryQueue进行消息请求-回复#
以下代码显示了如何使用,TemporaryQueue
消息接收者会在此队列上回复。临时队列是会话范围的(jmsContext 范围),会话关闭时会将其删除。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| //SessionFactory & Queue creation skipped for bravity
try(JMSContext jmsContext = connectionFactory.createContext()) {
JMSProducer producer = jmsContext.createProducer();
//Use temporary Queue to send and receive messages.
TemporaryQueue replyQueue = jmsContext.createTemporaryQueue();
TextMessage message = jmsContext.createTextMessage("Sender message - Hi there!");
message.setJMSReplyTo(replyQueue);
producer.send(queue, message);
System.out.println(message.getJMSMessageID());
//Message received
JMSConsumer consumer = jmsContext.createConsumer(queue);
TextMessage messageReceived = (TextMessage) consumer.receive();
System.out.println(messageReceived.getText());
//Reply
JMSProducer replyProducer = jmsContext.createProducer();
TextMessage replyMessage = jmsContext.createTextMessage("Reply message - Hi, all well here!");
replyMessage.setJMSCorrelationID(messageReceived.getJMSMessageID());
replyProducer.send(messageReceived.getJMSReplyTo(), replyMessage);
//Receive and process reply
JMSConsumer replyConsumer = jmsContext.createConsumer(replyQueue);
System.out.println( replyConsumer.receiveBody(String.class));
TextMessage replyReceived = (TextMessage) replyConsumer.receive();
System.out.println(replyReceived.getJMSCorrelationID());
replyQueue.delete();
}
|
输出
1
2
| Sender message - Hi there!
Reply message - Hi, all well here!
|
如第 8 行所示,需要在消息头上设置临时队列。我们已经在JMS 消息模型中讨论过消息头。接收方读取消息并了解要回复的队列,如第 21 行所示。
**注意:**少数 JMS 提供商可能需要特殊权限才能创建 temporaryQueue。只需与您的 JMS 管理员沟通以允许该权限即可。
2. 使用另一个队列作为回复队列#
就像创建临时队列一样,您可以拥有多个队列。一个用于消息,另一个用于回复原始消息。在下面的代码中,queue
和replyQueue
的用途完全相同。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
| package lab02.message.requestresponse;
import labxx.common.settings.CommonSettings;
import javax.jms.*;
public class MessageRequestResponse {
public static void main(String[] args) throws JMSException, InterruptedException {
ConnectionFactory connectionFactory = CommonSettings.getConnectionFactory();
Queue queue = CommonSettings.getDefaultQueue();
Queue replyQueue = CommonSettings.getDefaultReplyQueue();
Thread messageproducer = new Thread() {
public void run() {
try (JMSContext jmsContext = connectionFactory.createContext()) {
JMSProducer producer = jmsContext.createProducer();
//Send the message
Message message = jmsContext.createTextMessage("Order placed successfully");
message.setJMSReplyTo(replyQueue);
producer.send(queue, message);
sleep(2000);
JMSConsumer replyConsumer = jmsContext.createConsumer(replyQueue);
TextMessage replyMessage = (TextMessage) replyConsumer.receive();
System.out.println("Received reply: " + replyMessage.getText());
} catch (JMSException | InterruptedException ex) {
ex.printStackTrace();
}
}
};
Thread messageConsumer = new Thread() {
public void run() {
try (JMSContext jmsContext = connectionFactory.createContext()) {
//Receive message
Thread.sleep(1000);
JMSConsumer consumer = jmsContext.createConsumer(queue);
Message msg = consumer.receive();
System.out.println("Received message: " + msg.getBody(String.class));
//Reply message
jmsContext.createProducer().send(msg.getJMSReplyTo(), "Order will be dispatched soon!");
} catch (JMSException | InterruptedException e) {
e.printStackTrace();
}
}
};
messageproducer.start();
messageConsumer.start();
}
}
|
输出
1
2
| Received message: Order placed successfully
Received reply: Order will be dispatched soon!
|
注意:当回复队列中有多个回复时,很难找出消息和回复之间的联系。也就是说,您如何知道哪个 replyMessage 属于哪个消息。解决方案是使用JMSCorrelationID。
在 JMS 点对点请求响应消息传递中使用 JMSCorrelationID#
下面的示例演示了 的使用JMSCorrelationID
。首先将消息发送到队列,接收方侦听该消息并对其进行处理。收到的消息包含JMSreplyTo
队列名称等详细信息。消息接收方在 replyQueue 上发送回复并JMSCorrelationID
从 receivedMessage id 中设置,如 所示line 40
。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
| package lab02.message.requestresponse;
import labxx.common.settings.CommonSettings;
import javax.jms.*;
// MessageId and CorrelationId test
public class MessageIdCorrelationId {
public static void main(String[] args) throws JMSException, InterruptedException {
ConnectionFactory connectionFactory = CommonSettings.getConnectionFactory();
Queue queue = CommonSettings.getDefaultQueue();
Queue replyQueue = CommonSettings.getDefaultReplyQueue();
try (JMSContext jmsContext = connectionFactory.createContext()) {
//Message listener on replyQueue
JMSConsumer replyConsumer = jmsContext.createConsumer(replyQueue);
replyConsumer.setMessageListener(msg -> {
try {
System.out.println("Reply message: " + msg.getBody(String.class));
System.out.println("Reply MessageID: " + msg.getJMSMessageID());
System.out.println("Reply CorrelationID: " + msg.getJMSCorrelationID());
} catch (JMSException e) {
e.printStackTrace();
}
});
//Message1
JMSProducer producer = jmsContext.createProducer();
TextMessage message = jmsContext.createTextMessage("Order placed successfully");
message.setJMSReplyTo(replyQueue);
producer.send(queue, message);
System.out.println("Message1 " + message.getJMSMessageID());
//Receive Message
JMSConsumer consumer = jmsContext.createConsumer(queue);
TextMessage receivedMsg = (TextMessage) consumer.receive();
System.out.println("Message received: " + receivedMsg.getText());
//Reply Message
TextMessage replyMessage = jmsContext.createTextMessage("Order Acknowledged");
replyMessage.setJMSCorrelationID(receivedMsg.getJMSMessageID());
jmsContext.createProducer().send(receivedMsg.getJMSReplyTo(), replyMessage);
//Remember to close Otherwise will throw Exception
replyConsumer.close();
}
}
}
|
1
2
3
4
5
| Message1 ID:12-192.168.0.197(bc:c9:94:43:39:c4)-61701-1576468524639
Message received: Order placed successfully
Reply message: Order Acknowledged
Reply MessageID: ID:18-192.168.0.197(bc:c9:94:43:39:c4)-61701-1576468524648
Reply CorrelationID: ID:12-192.168.0.197(bc:c9:94:43:39:c4)-61701-1576468524639
|
这些都是使用队列的 JMS 点对点消息传递的一部分。您还学习了如何在 JMS 中使用异步消息处理。请在下面的评论中分享您对本文的反馈。
原文链接:JMS Point-to-point messaging in Action