消息是 JMS 的核心,无论是事件还是业务数据。消息由标头、自定义属性和正文组成。这是您将在 JMS 消息模型(JMS 消息剖析)中学习的内容。

JMS 消息模型

JMS 消息由 Header、Properties 和 Body(Payload)组成。我们将在下面详细讨论它们。

  • 消息头是有关消息的元数据。消息头有两种类型:开发人员设置的头和 JMS 提供商设置的头。

  • 消息属性是用于向消息添加可选标头字段的附加标头字段。属性字段基本上有 3 种类型。

    • 应用程序特定属性 - 允许设置应用程序特定的自定义标题。
    • 标准属性 – JMS 定义的标准属性。
    • JMS-Provider 特定属性。
  • 消息主体是发送到目的地的有效负载。JMS 支持多种类型(格式)的有效负载。

1. 消息头字段

一些消息头由程序员设置,一些由 JMS 提供商内部设置。消息头是 JMS 消息模型中首先要探讨的内容。

1.1. JMS 目标

此标头包含消息发送的目的地。收到消息时,目的地必须与分配的值相等。

1
2
javax.jms.Message#setJMSDestination(Destination)
javax.jms.Message#getJMSDestination()

1.2. JMS交付模式

JMS 支持 PERSISTEN T和 NON_PERSISTENT 消息传输模式。此标头表示传递模式。

1
2
3
javax.jms.Message#setJMSDeliveryModeint
javax.jms.Message#getJMSDeliveryMode()
javax.jms.DeliveryMode

1.3. JMS消息ID

此标头字段值唯一标识提供商发送的每个消息。它是一个字符串值。程序员不需要处理它。

1
2
3
javax.jms.Message#getJMSMessageID()
javax.jms.Message#setJMSMessageID(String)
javax.jms.MessageProducer#setDisableMessageID(boolean)

1.4. JMS时间戳

消息被交给提供商处理的时间戳。

1
2
javax.jms.Message#setJMSTimestamp(long)
javax.jms.MessageProducer#setDisableMessageTimestamp(boolean)

1.5. JMS重新交付

当客户端收到带有此标头的消息时,很可能该消息已在过去送达,但未收到确认。程序员无法更改此值。

1
2
javax.jms.Message#setJMSRedeliveredboolean
javax.jms.Message#getJMSRedelivered()

1.6. JMSExpiration – 设置消息生存时间

JMS 通过添加生存时间值来为每条消息设置过期值。生存时间值在方法中指定send()。如果生存时间值设置为零,则 JMSExpiration 标头值也会变为零,这意味着消息不会过期。

1
2
3
jmsProducer.setTimeToLive(long timeToLive);
javax.jms.Message#setJMSExpiration(long)
javax.jms.Message#getJMSExpiration()

下面的示例演示了如何使用producer.setTimeToLive()方法来设置消息过期时间。正如您在输出中看到的那样,消息的生命周期只有 2 秒,5 秒后它将从队列中删除。因此,消费者不会收到该消息并记录 NullPointer 异常。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package lab01.message.model;

import labxx.common.settings.CommonSettings;
import javax.jms.*;

public class MessageExpirationTest {
  public static void main(String[] args) throws InterruptedException, JMSException {
    ConnectionFactory connectionFactory = CommonSettings.getConnectionFactory();
    Queue queue = CommonSettings.getDefaultQueue();

    try (JMSContext jmsContext = connectionFactory.createContext()) {
      JMSProducer producer = jmsContext.createProducer();
      producer.setTimeToLive(2000);
      producer.send(queue, "This message will expire soon");

      Thread.sleep(5000);

      JMSConsumer consumer = jmsContext.createConsumer(queue);
      TextMessage message = (TextMessage) consumer.receive(10000);
      System.out.println(message.getText());
    }
  }
}

输出

1
Exception in thread "main" java.lang.NullPointerException

1.7. JMSPriority 标头

JMS 定义了 10 个优先级别,从 0(最低)到 9(最高)。应用程序客户端应将 0 到 4 视为普通优先级消息,将 5 到 9 视为加急消息。加急消息优先于普通优先级消息传送。

1
jmsProducer.setPriority(int priority);

更多讨论请参阅**优先考虑 JMS 消息教程**。

1.8. JMSDeliveryTime – 消息传递延迟

表示投递时间。这意味着消息将在此标头所表示的时间或之后投递。通过将投递时间与指定的投递延迟相加来计算。

1
jmsProducer.setDeliveryDelay(long deliveryDelay);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
package lab01.message.model;

import labxx.common.settings.CommonSettings;
import javax.jms.*;

public class MessageDelayDelivery {
  public static void main(String[] args) throws JMSException {
    ConnectionFactory connectionFactory = CommonSettings.getConnectionFactory();
    Queue queue = CommonSettings.getDefaultQueue();

    try (JMSContext jmsContext = connectionFactory.createContext()) {
      JMSProducer producer = jmsContext.createProducer();
      producer.setDeliveryDelay(3000);//delivered after 3 seconds
      producer.send(queue, "Message with a delay");

      JMSConsumer consumer = jmsContext.createConsumer(queue);
      TextMessage textMsg = (TextMessage) consumer.receive(5000);//Wait for 5 seconds
      System.out.println("Received message: " + textMsg.getText());
    }
  }
}

当您运行该示例时,您将看到消息在 3 秒后送达。

1.9. JMS关联ID

客户端使用此标头字段将一条消息与另一条消息链接起来。在请求-响应场景中很有用。

1
2
jmsProducer.setJMSCorrelationID(String correlationID)
jmsProducer.setJMSCorrelationIDAsBytes(byte[])

我将在本教程的后半部分讨论在请求-响应场景中使用 CorrelationID。

1.10. JMS类型

表示客户端在消息中设置的消息类型 ID。在少数 JMS 提供商中很有用。

1
jmsProducer.setJMSType(String type);

1.11. JMSReplyTo

此标头包含应发送消息回复的目的地(队列或主题)。

1
2
3
jmsProducer.setJMSReplyTo(Destination replyTo);
javax.jms.Message#setJMSReplyTo(Destination)
javax.jms.Message#getJMSReplyTo()

您将在本教程的后半部分学习 JMSReplyTo 标头的使用。

2. 消息属性

如前所述,消息属性是向消息添加可选标头字段的附加字段。消息属性有 3 种类型:自定义属性、JMS 定义属性和可选 JMS 提供程序特定属性。

2.1. 自定义属性

  • 属性名称(键)应遵循 java 标识符规则。
  • 属性值可以是booleanbyteshortint、和。longfloatdoubleString

下面的示例演示了消息中自定义属性的用法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package lab01.message.model;

import labxx.common.settings.CommonSettings;
import org.junit.jupiter.api.Test;
import javax.jms.*;
import java.util.Enumeration;
import java.util.UUID;

public class MessageCustomProperties {
  @Test
  public void test()
      throws JMSException {
    ConnectionFactory connectionFactory = CommonSettings.getConnectionFactory();
    Queue queue = CommonSettings.getDefaultQueue();

    try (JMSContext jmsContext = connectionFactory.createContext()) {
      JMSProducer producer = jmsContext.createProducer();
      TextMessage message = jmsContext.createTextMessage("Message with a custom property");
      message.setBooleanProperty("priorityUser", true);
      message.setStringProperty("authToken", UUID.randomUUID().toString());
      producer.send(queue, message);
 
      JMSConsumer consumer = jmsContext.createConsumer(queue);
  
      //Delay for testing purpose Only
      TextMessage textMsg = (TextMessage) consumer.receive(1000);
      Enumeration customProperties = textMsg.getPropertyNames();
      while (customProperties.hasMoreElements()) {
        System.out.println(customProperties.nextElement());
      }
      System.out.println("priorityUser: " + textMsg.getBooleanProperty("priorityUser"));
      System.out.println("authToken: " + textMsg.getStringProperty("authToken"));
      System.out.println("Received message: " + textMsg.getText());
    }
  }
}

输出

1
2
3
4
5
6
JMSXDeliveryCount
priorityUser
authToken
priorityUser: true
authToken: 010a908f-a3a6-4a2b-8d9f-5e177f1c223b
Received message: Message with a custom property

您可以迭代各种消息属性,如上例所示(第 28-31 行)。

2.2. JMS 定义的属性

所有 JMS 定义的属性都有JMSX属性名前缀。下面是属性列表、属性类型、JMS 必须实现的可选或强制属性及其用途。

1. JMSXUserID — 字符串 — 可选 — 由提供商在发送时设置, 用于识别发送消息的用户。

2. JMSXAppID — 字符串 — 可选 — 由提供商在发送时设置 这是发送消息的应用程序的身份。

3. JMSXDeliveryCount – int – 强制 – 由提供商在接收时设置 消息传递尝试的次数。

4. JMSXGroupID – 字符串 – 可选 – 由客户端设置 此消息所属消息组的标识。

5. JMSXGroupSeq – int – 可选 – 由客户端设置 此消息在组内的序列号。

6. JMSXProducerTXID — 字符串 — 可选 — 由提供商在发送 生成消息的事务 ID 时设置。

7. JMSXConsumerTXID — 字符串 — 可选 — 由提供商在接收 使用消息的交易 ID 时设置。

8. JMSXRcvTimestamp — long — 可选 — 由提供商在接收时设置 JMS 将消息传递给消费者的时间。

9. JMSXState – int – 可选 – 由提供商设置 消息是否处于等待、就绪、过期或保留状态。没有 API 可供程序员检查这一点。

2.3. JMS 提供程序特定属性

JMSJMS_<vendor_name>为特定于供应商的属性保留了属性名称前缀。这些特定于供应商的属性适用于本机供应商/提供商客户端。程序员不能在 JMS 代码中使用。

3. JMS 消息体 – JMS 中的 5 种消息类型

最后,在本次 JMS 消息模型讨论中,我们将讨论实际的消息内容。JMS 提供了 5 种形式的消息类型。

1. StreamMessage

消息主体包含 Java 原始值流。

1
2
3
4
StreamMessage message = jmsContext.createStreamMessage();
message.writeString("String Content");

StreamMessage receivedMessage = (StreamMessage) consumer.receive();

2. MapMessage

消息主体包含键值对,键是字符串对象,值是 Java 原始类型。

1
2
3
4
MapMessage message = jmsContext.createMapMessage();
message.setString("sampleKey", "sampleValue");

MapMessage receivedMessage = (MapMessage) consumer.receive();

**3. **TextMessage

正文包含字符串的消息。

1
2
3
TextMessage message = jmsContext.createTextMessage("Test TextMessage Type");

TextMessage receivedMessage = (TextMessage) jmsContext.createConsumer(queue).receive();

4. ObjectMessage

包含可序列化 Java 对象的消息。

1
2
3
4
ObjectMessage message = jmsContext.createObjectMessage();
message.setObject(new UserCommand("John", "[email protected]"));

ObjectMessage receivedMessage = (ObjectMessage) consumer.receive();

5. BytesMessage

包含未解释的字节流的消息。

1
2
3
4
BytesMessage message = jmsContext.createBytesMessage();
message.writeUTF("नमस्ते");

BytesMessage receivedMessage = (BytesMessage) consumer.receive();

以下代码从高层次演示了各种消息类型的工作方式。您还可以探索更多方法。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package lab01.message.model;

import labxx.common.settings.CommonSettings;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import javax.jms.*;
import java.io.Serializable;
import java.util.StringJoiner;
import java.util.UUID;

/**
 * Test - TextMessage, ByteMessage, ObjectMessage, StreamMessage, MapMessage.
 */
public class MessageTypesTest {
  private static ConnectionFactory connectionFactory = null;
  private static Queue queue = null;

  @BeforeAll
  public static void setUp() {
    connectionFactory = CommonSettings.getConnectionFactory();
    queue = CommonSettings.getDefaultQueue();
  }

  /**
   * @throws JMSException
   */
  @Test
  public void testTextMessage() throws JMSException {
    try (JMSContext jmsContext = connectionFactory.createContext()) {
      JMSProducer producer = jmsContext.createProducer();
      TextMessage message = jmsContext.createTextMessage("Test TextMessage Type");
      producer.send(queue, message);
      TextMessage receivedMessage = (TextMessage) jmsContext.createConsumer(queue).receive();
      System.out.println("Received message: " + receivedMessage.getText());
    }
  }

  @Test
  public void testByteMessage() throws JMSException {
    try (JMSContext jmsContext = connectionFactory.createContext()) {
      JMSProducer producer = jmsContext.createProducer();
      BytesMessage message = jmsContext.createBytesMessage();
      message.writeUTF("नमस्ते");
      message.writeBoolean(true);
      message.writeLong(12345L);
      producer.send(queue, message);
      JMSConsumer consumer = jmsContext.createConsumer(queue);
      BytesMessage receivedMessage = (BytesMessage) consumer.receive();
      System.out.println("==== ByteMessage Demo ====");
      System.out.println(receivedMessage.readUTF());
      System.out.println(receivedMessage.readBoolean());
      System.out.println(receivedMessage.readLong());
    }
  }

  @Test
  public void testStreamMessage() throws JMSException {
    try (JMSContext jmsContext = connectionFactory.createContext()) {
      JMSProducer producer = jmsContext.createProducer();
      StreamMessage message = jmsContext.createStreamMessage();
      message.writeString("String Content");
      message.writeString("Another string");
      message.writeInt(101);
      producer.send(queue, message);
      JMSConsumer consumer = jmsContext.createConsumer(queue);
      StreamMessage receivedMessage = (StreamMessage) consumer.receive();
      System.out.println("===== StreamMessage =====");
      System.out.println(receivedMessage.readString());
      System.out.println(receivedMessage.readString());
      System.out.println(receivedMessage.readInt());
    }
  }

  @Test
  public void testMapMessage() throws JMSException {
    try (JMSContext jmsContext = connectionFactory.createContext()) {
      JMSProducer producer = jmsContext.createProducer();
      MapMessage message = jmsContext.createMapMessage();
      message.setString("sampleKey", "sampleValue");
      producer.send(queue, message);
      JMSConsumer consumer = jmsContext.createConsumer(queue);
      MapMessage receivedMessage = (MapMessage) consumer.receive();
      System.out.println("===== MapMessage Demo =====");
      System.out.println(receivedMessage.getString("sampleKey"));
    }
  }

  @Test
  public void testObjectMessage() throws JMSException {
    try (JMSContext jmsContext = connectionFactory.createContext()) {
      JMSProducer producer = jmsContext.createProducer();
      ObjectMessage message = jmsContext.createObjectMessage();
      message.setObject(new UserCommand("John", "[email protected]"));
      producer.send(queue, message);
      JMSConsumer consumer = jmsContext.createConsumer(queue);
      ObjectMessage receivedMessage = (ObjectMessage) consumer.receive();
      System.out.println("===== ObjectMessage Demo =====");
      System.out.println(receivedMessage.getObject());
    }
  }

  private static class UserCommand implements Serializable {
    private String id;
    private String name;
    private String email;
    public UserCommand(String name, String email) {
      id = UUID.randomUUID().toString();
      this.name = name;
      this.email = email;
    }

    @Override
    public String toString() {
      return new StringJoiner(", ", UserCommand.class.getSimpleName() + "[", "]")
          .add("id='" + id + "'")
          .add("name='" + name + "'")
          .add("email='" + email + "'")
          .toString();
    }
  }
}

输出

您的输出可能会略有不同,但结果应该相似。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
===== StreamMessage =====
String Content
Another string
101

===== ObjectMessage Demo =====
UserCommand[id='e5e80542-7201-4418-bced-064d49bb5b8f', name='John', email='[email protected]']

Received message: Test TextMessage Type

===== MapMessage Demo =====
sampleValue

==== ByteMessage Demo ====
नमस्ते
true
12345

在本教程中,我通过示例简要讨论了 JMS 消息模型。请在下面的评论中告诉我您的反馈或问题。我将在单独的文章中讨论**消息优先级消息** 请求-响应

原文链接:JMS Message Model