发送给接收方的消息通常需要根据某些标准进行过滤。JMS 提供消息选择器,允许 JMS 消费者根据消息头指定其感兴趣的消息。在本文中,您将学习使用 JMS 消息选择器来过滤消息。

这是一篇高级教程,在继续阅读本文之前,请确保您对 JMS 有基本的了解。如果您是 JMS 新手,请查看JMS 点对点消息传递模型文章。

为什么要使用消息选择器?

当一条消息被广播给许多接收者时,将标准放入订阅中以注册特定的兴趣是很有用的。这样只有感兴趣的消息才会被传递给订阅者。

  • 消息选择器是一种String基于SQL92条件表达式的语法。
  • 仅传递标题和属性与选择器匹配的消息。
  • 消息选择器不能引用消息正文值。
  • 消息选择器从左到右进行评估。
  • 选择器文字和运算符通常以大写字母书写,但它们不区分大小写。

1. 按属性过滤消息

就像我之前提到的,你只能根据属性或标头过滤消息,而不能根据实际消息内容过滤消息。下面的示例显示,只有级别日志被传递给消费者。链接至GitHub 代码库

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package lab04.message.filtering;

import labxx.common.settings.CommonSettings;
import org.junit.jupiter.api.Test;
import javax.jms.*;
import java.util.UUID;

public class MessageSelectorExample {
  private static ConnectionFactory connectionFactory = null;
  private static Queue defaultQueue = null;
  static {
    connectionFactory = CommonSettings.getConnectionFactory();
    defaultQueue = CommonSettings.getDefaultQueue();
  }

  @Test
  public void messageFilterOnProperties() throws JMSException, InterruptedException {
    try (JMSContext jmsContext = connectionFactory.createContext()) {
      LogEvent event1 = new LogEvent("Sample ERROR Log", UUID.randomUUID().toString(), LogLevel.ERROR);
      LogEvent event2 = new LogEvent("Sample DEBUG Log", UUID.randomUUID().toString(), LogLevel.DEBUG);
      LogEvent event3 = new LogEvent("Sample INFO Log", UUID.randomUUID().toString(), LogLevel.INFO);
      LogEvent event4 = new LogEvent("Sample WARN Log", UUID.randomUUID().toString(), LogLevel.WARN);

      //NOTE - If you keep "logLevel = DEBUG", it will not work!
      JMSConsumer consumer = jmsContext.createConsumer(defaultQueue, "logLevel = 'DEBUG'");
 
      consumer.setMessageListener(msg -> {
        System.out.println(msg);
        try {
          LogEvent event = (LogEvent) ((ObjectMessage) msg).getObject();
          System.out.println(event);
        } catch (JMSException e) {
          e.printStackTrace();
        }
      });

      JMSProducer producer = jmsContext.createProducer();

      //send event1
      ObjectMessage objectMessage = jmsContext.createObjectMessage();
      objectMessage.setObject(event1);
      objectMessage.setStringProperty("logLevel", event1.getLogLevel().name());
      producer.send(defaultQueue, objectMessage);

      //Send event2
      objectMessage = jmsContext.createObjectMessage();
      objectMessage.setObject(event2);
      objectMessage.setStringProperty("logLevel", event2.getLogLevel().name());
      producer.send(defaultQueue, objectMessage);

      //Send event3
      objectMessage = jmsContext.createObjectMessage();
      objectMessage.setObject(event3);
      objectMessage.setStringProperty("logLevel", event3.getLogLevel().name());
      producer.send(defaultQueue, objectMessage);
 
      //Send event4
      objectMessage = jmsContext.createObjectMessage();
      objectMessage.setObject(event4);
      objectMessage.setStringProperty("logLevel", event4.getLogLevel().name());
      producer.send(defaultQueue, objectMessage);
      Thread.sleep(2000);
      consumer.close();
    }
  }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package lab04.message.filtering;

import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.StringJoiner;

public class LogEvent implements Serializable {
  //Timestamp is the primary Key
  private LocalDateTime id;

  //Log Body
  private String body;

  //Unique Server Id
  private String machineId;
  private LogLevel logLevel;
  public LogEvent() {
    super();
  }

  public LogEvent(String body, String machineId, LogLevel logLevel) {
    this.id = LocalDateTime.now();
    this.body = body;
    this.machineId = machineId;
    this.logLevel = logLevel;
  }
  public LocalDateTime getId() {
    return id;
  }
  public void setId(LocalDateTime id) {
    this.id = id;
  }
  public String getBody() {
    return body;
  }
  public void setBody(String body) {
    this.body = body;
  }
  public String getMachineId() {
    return machineId;
  }
  public void setMachineId(String machineId) {
    this.machineId = machineId;
  }
  public LogLevel getLogLevel() {
    return logLevel;
  }
  public void setLogLevel(LogLevel logLevel) {
    this.logLevel = logLevel;
  }
  @Override
  public String toString() {
    return new StringJoiner(", ", LogEvent.class.getSimpleName() + "[", "]")
        .add("id=" + id)
        .add("body='" + body + "'")
        .add("machineId='" + machineId + "'")
        .add("logLevel=" + logLevel)
        .toString();
  }
}
1
2
3
4
5
package lab04.message.filtering;

public enum LogLevel {
  TRACE, DEBUG, INFO, WARN, ERROR
}

输出

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
Class:			com.sun.messaging.jmq.jmsclient.ObjectMessageImpl
getJMSMessageID():	ID:13-192.168.0.197(98:ca:f3:fc:e3:8f)-51690-1577423107544
getJMSTimestamp():	1577423107544
getJMSCorrelationID():	null
JMSReplyTo:		null
JMSDestination:		PTPQueue
getJMSDeliveryMode():	PERSISTENT
getJMSRedelivered():	false
getJMSType():		null
getJMSExpiration():	0
getJMSDeliveryTime():	0
getJMSPriority():	4
Properties:		{JMSXDeliveryCount=1, logLevel=DEBUG}
LogEvent[id=2019-12-27T13:05:07.512, body='Sample DEBUG Log',
machineId='08b7c29b-0934-4f3d-8ddf-c406ea795dee', logLevel=DEBUG]

2. 按标题过滤邮件

您已经了解了基于 JMS 属性的消息过滤。现在,请看下面的示例以了解基于 Header 的消息过滤。ERROR级别日志或优先级在 5 到 9 之间的消息将传递给消费者。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package lab04.message.filtering;

import labxx.common.settings.CommonSettings;
import org.junit.jupiter.api.Test;
import javax.jms.*;
import java.util.UUID;

public class MessageSelectorExample {
  private static ConnectionFactory connectionFactory = null;
  private static Queue defaultQueue = null;
  static {
    connectionFactory = CommonSettings.getConnectionFactory();
    defaultQueue = CommonSettings.getDefaultQueue();
  }

  @Test
  public void messageFilterOnHeader() throws JMSException, InterruptedException {
    try (JMSContext jmsContext = connectionFactory.createContext()) {
      LogEvent event1 = new LogEvent("Sample ERROR Log", UUID.randomUUID().toString(), LogLevel.ERROR);
      LogEvent event2 = new LogEvent("Sample DEBUG Log", UUID.randomUUID().toString(), LogLevel.DEBUG);
      LogEvent event3 = new LogEvent("Sample INFO Log", UUID.randomUUID().toString(), LogLevel.INFO);
      LogEvent event4 = new LogEvent("Sample WARN Log", UUID.randomUUID().toString(), LogLevel.WARN);
      JMSConsumer consumer = jmsContext.createConsumer(defaultQueue, "logLevel = 'ERROR' OR JMSPriority BETWEEN 5 AND 9");
    
      consumer.setMessageListener(msg -> {
        System.out.println(msg);
        try {
          LogEvent event = (LogEvent) ((ObjectMessage) msg).getObject();
          System.out.println(event);
        } catch (JMSException e) {
          e.printStackTrace();
        }
      });
 
      JMSProducer producer = jmsContext.createProducer();
 
      //send event1
      ObjectMessage objectMessage = jmsContext.createObjectMessage();
      objectMessage.setObject(event1);
      objectMessage.setStringProperty("logLevel", event1.getLogLevel().name());
      producer.send(defaultQueue, objectMessage);
   
      //Send event2
      objectMessage = jmsContext.createObjectMessage();
      objectMessage.setObject(event2);
      objectMessage.setStringProperty("logLevel", event2.getLogLevel().name());
      producer.send(defaultQueue, objectMessage);

      //Send event3
      objectMessage = jmsContext.createObjectMessage();
      objectMessage.setObject(event3);
      objectMessage.setStringProperty("logLevel", event3.getLogLevel().name());
      producer.setPriority(5);
      producer.send(defaultQueue, objectMessage);

      //Send event4
      objectMessage = jmsContext.createObjectMessage();
      objectMessage.setObject(event4);
      objectMessage.setStringProperty("logLevel", event4.getLogLevel().name());

      //Reset to normal priority
      producer.setPriority(4);
      producer.send(defaultQueue, objectMessage);
      Thread.sleep(2000);
      consumer.close();
    }
  }
}

输出

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Class:			com.sun.messaging.jmq.jmsclient.ObjectMessageImpl
getJMSMessageID():	ID:12-192.168.0.197(b8:3d:5e:24:df:26)-53719-1577428536655
getJMSTimestamp():	1577428536655
getJMSCorrelationID():	null
JMSReplyTo:		null
JMSDestination:		PTPQueue
getJMSDeliveryMode():	PERSISTENT
getJMSRedelivered():	false
getJMSType():		null
getJMSExpiration():	0
getJMSDeliveryTime():	0
getJMSPriority():	4
Properties:		{JMSXDeliveryCount=1, logLevel=ERROR}
LogEvent[id=2019-12-27T14:35:36.623, body='Sample ERROR Log', 
machineId='7d743936-aae8-44a2-a527-cd88236b6ea6', logLevel=ERROR]

Class:			com.sun.messaging.jmq.jmsclient.ObjectMessageImpl
getJMSMessageID():	ID:14-192.168.0.197(b8:3d:5e:24:df:26)-53719-1577428536661
getJMSTimestamp():	1577428536661
getJMSCorrelationID():	null
JMSReplyTo:		null
JMSDestination:		PTPQueue
getJMSDeliveryMode():	PERSISTENT
getJMSRedelivered():	false
getJMSType():		null
getJMSExpiration():	0
getJMSDeliveryTime():	0
getJMSPriority():	5
Properties:		{JMSXDeliveryCount=1, logLevel=INFO}
LogEvent[id=2019-12-27T14:35:36.623, body='Sample INFO Log', 
machineId='01ce3812-1869-4d23-96d8-cfc2ea6267c2', logLevel=INFO]

选择器规则

与任何查询语言一样,选择器由文字、标识符、表达式和运算符组成。我将在下面讨论每个规则以供参考。

1. 文字规则

  • 文字String用单引号引起来,单引号用两个单引号表示。例如,logLevel = 'ERROR'name='Ram''s'
  • 数值按原样显示,带有可选的+-符号。例如,range BETWEEN -200 AND +30。您也可以直接提及 ,30而不是+30
  • 数字也可以用科学计数法表示,例如等-7E36.2
  • JavaLongDouble也受支持。
  • 布尔值可以表示为TRUEFALSE

2. 标识符规则

  • 适用于 Java 标识符的规则也适用于 JMS 选择器标识符。这意味着_$可用于标识符。
  • 标识符不能包含 Java 关键字。
  • 标识符不能具有诸如NULL, TRUE, FALSE, NOT, AND, OR, BETWEEN, LIKE, IN, IS, 或者 ESCAPE
  • 标识符区分大小写。
  • 任何以 开头的名称JMSX都是 JMS 定义的属性。
  • 任何以 开头的名称JMS_都是提供商特定的属性。
  • 任何不包含 的名称(标识符)JMS都是应用程序定义的属性。

3. 运营商规则

  • ()此处适用包围的标准规则。
  • 逻辑运算符的优先顺序为:NOTANDOR
  • 按优先顺序排列的比较运算符:=>>=<<=<>(不等于运算符)。
  • 算术运算符的优先顺序如下:
    • 一元运算符 – +, -
    • 乘法和除法 – */
    • 加减 - +-

您了解了 JMS 消息选择器及其用于过滤传递给客户端的消息的方式。它在实际应用中非常有用,特别是从性能角度来看。您的应用程序可以将大量处理负载委托给 JMS 提供程序,并确保只向其传递必要的消息。

原文链接: JMS Message Selectors in Action to Filter Messages