点对点系统使用队列进行消息传递。客户端将消息发送到特定队列,特定订阅者监听或接收来自该队列的消息。在 JMS 点对点消息传递系统中,队列用于处理单个发送者和单个消费者。确保您已经阅读了有关JMS 消息模型以及如何在 JMS 中发送和接收消息的先前教程。

使用队列进行 PTP 消息传送

您现在知道点对点消息传递完全是利用队列。JMS 提供javax.jms.Queue表示队列对象的功能。我将利用 JMS 2.0 API 来javax.jms.JMSProducer发送javax.jms.JMSConsumer和接收消息。

我已经向您展示了多个使用 JMS 队列的示例。以下代码表示在 JMS 2.0 API 中使用 JMS 队列的步骤,只是为了再次刷新一下概念。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package lab00.simple.helloworld;

import javax.jms.*;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class TestHelloWorld {
  public static void main(String[] args) {

    ConnectionFactory connectionFactory = null;
    Queue queue = null;

    try {
      InitialContext initialContext = new InitialContext();
  
      //Step-1 Create ConnectionFactory
      connectionFactory
          = (ConnectionFactory) initialContext.lookup("jms/__defaultConnectionFactory");
 
      //Step-2 Get the Destination
      queue = (Queue) initialContext.lookup("jms/PTPQueue");
    } catch (NamingException e) {
      e.printStackTrace();
    }

    //Step-3 Create J MSContext
    try (JMSContext jmsContext = connectionFactory.createContext()) {
 
      //Step-4a Create a Text Message and send
      TextMessage textMessage = jmsContext.createTextMessage("Message using JMS 2.0");
      JMSProducer jmsProducer = jmsContext.createProducer().send(queue, textMessage);

      //Step-4b Receive the message
      TextMessage message = (TextMessage) jmsContext.createConsumer(queue).receive();
      System.out.println(message.getText());
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }
}

注意:在点对点消息系统中,一旦从队列中读取(接收)消息,该消息就会被删除。换句话说,它只能被读取一次,之后就会从队列中删除。

JMS 中的异步消息监听器

JMS 允许您为 分配一个异步消息侦听器JMSConsumer。异步侦听器可用于队列和主题。您可以使用 lambda 函数,如下所示。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
@Test
public void testAsyncConsumer() throws InterruptedException {
  try(JMSContext jmsContext = connectionFactory.createContext()){
    JMSConsumer consumer = jmsContext.createConsumer(queue);
    consumer.setMessageListener(message -> {
      try {
        System.out.println(message.getBody(String.class));
      } catch (JMSException e) {
        e.printStackTrace();
      }
    });

    JMSProducer producer = jmsContext.createProducer().send(queue, "Message 1");
    producer.send(queue, "Message 2");
    producer.send(queue, "Message 3");

    //Close the Consumer
    consumer.close();
  }
}

输出

1
2
3
Message 1
Message 2
Message 3

注意:始终建议尽可能使用异步消息侦听器。在本教程中,我为 bravity 使用了同步消息处理,但在项目中使用异步方式是正确的方式。

临时队列

javax.jms.TemporaryQueue为连接持续时间创建的唯一临时队列对象。它只能由创建它的连接使用。TemporaryQueue如果没有为其分配接收者,JMS 会确保删除它。我们将在下面讨论 TemporaryQueue 在典型的请求-响应场景中的用途,其中接收者使用另一条消息响应发送者。

下面的代码演示了如何创建 TemporaryQueue 并将其链接到消息。

1
2
3
4
5
TemporaryQueue tempReplyQ = jmsContext.createTemporaryQueue();

//Send the message
Message message = jmsContext.createTextMessage("Temp Queue demo");
message.setJMSReplyTo(tempReplyQ);
1
2
3
4
public interface TemporaryQueue extends Queue {
    void 
    delete() throws JMSException; 
}

PTP 消息传递中的 QueueBrowser

A由或javax.jms.QueueBrowser创建。您可以使用浏览队列中的消息而不删除它们。JMSContextSessionQueueBrowser

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  @Test
  public void testQueueBrowser() throws JMSException {
    try (JMSContext jmsContext = connectionFactory.createContext()) {
      JMSProducer producer = jmsContext.createProducer();
      TextMessage message1 = jmsContext.createTextMessage("Start your day with a glass of Water!");
      TextMessage message2 = jmsContext.createTextMessage("Remember to do 10 mins stretching");

      producer.send(queue, message1);
      producer.send(queue, message2);

      QueueBrowser qBrowser = jmsContext.createBrowser(queue);
      Enumeration msgEnum = qBrowser.getEnumeration();
      while (msgEnum.hasMoreElements()) {
        TextMessage browsedMsg = (TextMessage) msgEnum.nextElement();
        System.out.println("Browsed message: " + browsedMsg.getText());
      }

      JMSConsumer consumer = jmsContext.createConsumer(queue);
      for (int i = 0; i < 2; i++) {
        System.out.println("Received message: " + consumer.receiveBody(String.class));
      }
    }
  }

输出

1
2
3
4
Browsed message: Start your day with a glass of Water!
Browsed message: Remember to do 10 mins stretching
Received message: Start your day with a glass of Water!
Received message: Remember to do 10 mins stretching

队列的可靠性

队列通常由管理员创建并长期存在。即使没有消费者接收消息,队列也会保留消息。因此,即使不采取任何特殊措施,它也能使使用非常可靠。

JMS 中的消息请求回复

存在消息接收者想要回复发送者的用例。在点对点消息系统中,有两种方法可以实现请求-回复场景。

  1. 通过使用TemporaryQueue
  2. 或者使用另一个队列作为回复队列。

1. 使用TemporaryQueue进行消息请求-回复

以下代码显示了如何使用,TemporaryQueue消息接收者会在此队列上回复。临时队列是会话范围的(jmsContext 范围),会话关闭时会将其删除。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//SessionFactory & Queue creation skipped for bravity
try(JMSContext jmsContext = connectionFactory.createContext()) {
      JMSProducer producer = jmsContext.createProducer();

      //Use temporary Queue to send and receive messages.
      TemporaryQueue replyQueue = jmsContext.createTemporaryQueue();
      TextMessage message = jmsContext.createTextMessage("Sender message - Hi there!");
      message.setJMSReplyTo(replyQueue);
      producer.send(queue, message);
      System.out.println(message.getJMSMessageID());

      //Message received
      JMSConsumer consumer = jmsContext.createConsumer(queue);
      TextMessage messageReceived = (TextMessage) consumer.receive();
      System.out.println(messageReceived.getText());

      //Reply
      JMSProducer replyProducer = jmsContext.createProducer();
      TextMessage replyMessage = jmsContext.createTextMessage("Reply message - Hi, all well here!");
      replyMessage.setJMSCorrelationID(messageReceived.getJMSMessageID());
      replyProducer.send(messageReceived.getJMSReplyTo(), replyMessage);
 
      //Receive and process reply
      JMSConsumer replyConsumer = jmsContext.createConsumer(replyQueue);
      System.out.println( replyConsumer.receiveBody(String.class));
      TextMessage replyReceived = (TextMessage) replyConsumer.receive();
      System.out.println(replyReceived.getJMSCorrelationID());
      replyQueue.delete();
    }

输出

1
2
Sender message - Hi there!
Reply message - Hi, all well here!

如第 8 行所示,需要在消息头上设置临时队列。我们已经在JMS 消息模型中讨论过消息头。接收方读取消息并了解要回复的队列,如第 21 行所示。

**注意:**少数 JMS 提供商可能需要特殊权限才能创建 temporaryQueue。只需与您的 JMS 管理员沟通以允许该权限即可。

2. 使用另一个队列作为回复队列

就像创建临时队列一样,您可以拥有多个队列。一个用于消息,另一个用于回复原始消息。在下面的代码中,queuereplyQueue的用途完全相同。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package lab02.message.requestresponse;

import labxx.common.settings.CommonSettings;
import javax.jms.*;

public class MessageRequestResponse {
  public static void main(String[] args) throws JMSException, InterruptedException {

    ConnectionFactory connectionFactory = CommonSettings.getConnectionFactory();
    Queue queue = CommonSettings.getDefaultQueue();
    Queue replyQueue = CommonSettings.getDefaultReplyQueue();

    Thread messageproducer = new Thread() {
      public void run() {
        try (JMSContext jmsContext = connectionFactory.createContext()) {
          JMSProducer producer = jmsContext.createProducer();
          //Send the message
          Message message = jmsContext.createTextMessage("Order placed successfully");
          message.setJMSReplyTo(replyQueue);
          producer.send(queue, message);
          sleep(2000);
          JMSConsumer replyConsumer = jmsContext.createConsumer(replyQueue);
          TextMessage replyMessage = (TextMessage) replyConsumer.receive();
          System.out.println("Received reply: " + replyMessage.getText());
        } catch (JMSException | InterruptedException ex) {
          ex.printStackTrace();
        }
      }
    };

    Thread messageConsumer = new Thread() {
      public void run() {
        try (JMSContext jmsContext = connectionFactory.createContext()) {
          //Receive message
          Thread.sleep(1000);
          JMSConsumer consumer = jmsContext.createConsumer(queue);
          Message msg = consumer.receive();
          System.out.println("Received message: " + msg.getBody(String.class));
          //Reply message
          jmsContext.createProducer().send(msg.getJMSReplyTo(), "Order will be dispatched soon!");
        } catch (JMSException | InterruptedException e) {
          e.printStackTrace();
        }
      }
    };

    messageproducer.start();
    messageConsumer.start();
  }
}

输出

1
2
Received message: Order placed successfully
Received reply: Order will be dispatched soon!

注意:当回复队列中有多个回复时,很难找出消息和回复之间的联系。也就是说,您如何知道哪个 replyMessage 属于哪个消息。解决方案是使用JMSCorrelationID

在 JMS 点对点请求响应消息传递中使用 JMSCorrelationID

下面的示例演示了 的使用JMSCorrelationID。首先将消息发送到队列,接收方侦听该消息并对其进行处理。收到的消息包含JMSreplyTo队列名称等详细信息。消息接收方在 replyQueue 上发送回复并JMSCorrelationID从 receivedMessage id 中设置,如 所示line 40

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package lab02.message.requestresponse;

import labxx.common.settings.CommonSettings;
import javax.jms.*;

// MessageId and CorrelationId test
public class MessageIdCorrelationId {
  public static void main(String[] args) throws JMSException, InterruptedException {
    ConnectionFactory connectionFactory = CommonSettings.getConnectionFactory();
    Queue queue = CommonSettings.getDefaultQueue();
    Queue replyQueue = CommonSettings.getDefaultReplyQueue();

    try (JMSContext jmsContext = connectionFactory.createContext()) {

      //Message listener on replyQueue
      JMSConsumer replyConsumer = jmsContext.createConsumer(replyQueue);
      replyConsumer.setMessageListener(msg -> {
        try {
          System.out.println("Reply message: " + msg.getBody(String.class));
          System.out.println("Reply MessageID: " + msg.getJMSMessageID());
          System.out.println("Reply CorrelationID: " + msg.getJMSCorrelationID());
        } catch (JMSException e) {
          e.printStackTrace();
        }
      });

      //Message1
      JMSProducer producer = jmsContext.createProducer();
      TextMessage message = jmsContext.createTextMessage("Order placed successfully");
      message.setJMSReplyTo(replyQueue);
      producer.send(queue, message);
      System.out.println("Message1 " + message.getJMSMessageID());

      //Receive Message
      JMSConsumer consumer = jmsContext.createConsumer(queue);
      TextMessage receivedMsg = (TextMessage) consumer.receive();
      System.out.println("Message received: " + receivedMsg.getText());

      //Reply Message
      TextMessage replyMessage = jmsContext.createTextMessage("Order Acknowledged");
      replyMessage.setJMSCorrelationID(receivedMsg.getJMSMessageID());
      jmsContext.createProducer().send(receivedMsg.getJMSReplyTo(), replyMessage);
 
      //Remember to close Otherwise will throw Exception
      replyConsumer.close();
    }
  }
}
1
2
3
4
5
Message1 ID:12-192.168.0.197(bc:c9:94:43:39:c4)-61701-1576468524639
Message received: Order placed successfully
Reply message: Order Acknowledged
Reply MessageID: ID:18-192.168.0.197(bc:c9:94:43:39:c4)-61701-1576468524648
Reply CorrelationID: ID:12-192.168.0.197(bc:c9:94:43:39:c4)-61701-1576468524639

这些都是使用队列的 JMS 点对点消息传递的一部分。您还学习了如何在 JMS 中使用异步消息处理。请在下面的评论中分享您对本文的反馈。

原文链接:JMS Point-to-point messaging in Action