[译]JMS 消息选择器在过滤消息中的应用
发送给接收方的消息通常需要根据某些标准进行过滤。JMS 提供消息选择器,允许 JMS 消费者根据消息头指定其感兴趣的消息。在本文中,您将学习使用 JMS 消息选择器来过滤消息。
这是一篇高级教程,在继续阅读本文之前,请确保您对 JMS 有基本的了解。如果您是 JMS 新手,请查看JMS 点对点消息传递模型文章。
为什么要使用消息选择器?
当一条消息被广播给许多接收者时,将标准放入订阅中以注册特定的兴趣是很有用的。这样只有感兴趣的消息才会被传递给订阅者。
- 消息选择器是一种
String基于SQL92条件表达式的语法。 - 仅传递标题和属性与选择器匹配的消息。
- 消息选择器不能引用消息正文值。
- 消息选择器从左到右进行评估。
- 选择器文字和运算符通常以大写字母书写,但它们不区分大小写。
1. 按属性过滤消息
就像我之前提到的,你只能根据属性或标头过滤消息,而不能根据实际消息内容过滤消息。下面的示例显示,只有级别日志被传递给消费者。链接至GitHub 代码库。
package lab04.message.filtering;
import labxx.common.settings.CommonSettings;import org.junit.jupiter.api.Test;import javax.jms.*;import java.util.UUID;
public class MessageSelectorExample { private static ConnectionFactory connectionFactory = null; private static Queue defaultQueue = null; static { connectionFactory = CommonSettings.getConnectionFactory(); defaultQueue = CommonSettings.getDefaultQueue(); }
@Test public void messageFilterOnProperties() throws JMSException, InterruptedException { try (JMSContext jmsContext = connectionFactory.createContext()) { LogEvent event1 = new LogEvent("Sample ERROR Log", UUID.randomUUID().toString(), LogLevel.ERROR); LogEvent event2 = new LogEvent("Sample DEBUG Log", UUID.randomUUID().toString(), LogLevel.DEBUG); LogEvent event3 = new LogEvent("Sample INFO Log", UUID.randomUUID().toString(), LogLevel.INFO); LogEvent event4 = new LogEvent("Sample WARN Log", UUID.randomUUID().toString(), LogLevel.WARN);
//NOTE - If you keep "logLevel = DEBUG", it will not work! JMSConsumer consumer = jmsContext.createConsumer(defaultQueue, "logLevel = 'DEBUG'");
consumer.setMessageListener(msg -> { System.out.println(msg); try { LogEvent event = (LogEvent) ((ObjectMessage) msg).getObject(); System.out.println(event); } catch (JMSException e) { e.printStackTrace(); } });
JMSProducer producer = jmsContext.createProducer();
//send event1 ObjectMessage objectMessage = jmsContext.createObjectMessage(); objectMessage.setObject(event1); objectMessage.setStringProperty("logLevel", event1.getLogLevel().name()); producer.send(defaultQueue, objectMessage);
//Send event2 objectMessage = jmsContext.createObjectMessage(); objectMessage.setObject(event2); objectMessage.setStringProperty("logLevel", event2.getLogLevel().name()); producer.send(defaultQueue, objectMessage);
//Send event3 objectMessage = jmsContext.createObjectMessage(); objectMessage.setObject(event3); objectMessage.setStringProperty("logLevel", event3.getLogLevel().name()); producer.send(defaultQueue, objectMessage);
//Send event4 objectMessage = jmsContext.createObjectMessage(); objectMessage.setObject(event4); objectMessage.setStringProperty("logLevel", event4.getLogLevel().name()); producer.send(defaultQueue, objectMessage); Thread.sleep(2000); consumer.close(); } }}package lab04.message.filtering;
import java.io.Serializable;import java.time.LocalDateTime;import java.util.StringJoiner;
public class LogEvent implements Serializable { //Timestamp is the primary Key private LocalDateTime id;
//Log Body private String body;
//Unique Server Id private String machineId; private LogLevel logLevel; public LogEvent() { super(); }
public LogEvent(String body, String machineId, LogLevel logLevel) { this.id = LocalDateTime.now(); this.body = body; this.machineId = machineId; this.logLevel = logLevel; } public LocalDateTime getId() { return id; } public void setId(LocalDateTime id) { this.id = id; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } public String getMachineId() { return machineId; } public void setMachineId(String machineId) { this.machineId = machineId; } public LogLevel getLogLevel() { return logLevel; } public void setLogLevel(LogLevel logLevel) { this.logLevel = logLevel; } @Override public String toString() { return new StringJoiner(", ", LogEvent.class.getSimpleName() + "[", "]") .add("id=" + id) .add("body='" + body + "'") .add("machineId='" + machineId + "'") .add("logLevel=" + logLevel) .toString(); }}package lab04.message.filtering;
public enum LogLevel { TRACE, DEBUG, INFO, WARN, ERROR}输出
Class: com.sun.messaging.jmq.jmsclient.ObjectMessageImplgetJMSMessageID(): ID:13-192.168.0.197(98:ca:f3:fc:e3:8f)-51690-1577423107544getJMSTimestamp(): 1577423107544getJMSCorrelationID(): nullJMSReplyTo: nullJMSDestination: PTPQueuegetJMSDeliveryMode(): PERSISTENTgetJMSRedelivered(): falsegetJMSType(): nullgetJMSExpiration(): 0getJMSDeliveryTime(): 0getJMSPriority(): 4Properties: {JMSXDeliveryCount=1, logLevel=DEBUG}LogEvent[id=2019-12-27T13:05:07.512, body='Sample DEBUG Log',machineId='08b7c29b-0934-4f3d-8ddf-c406ea795dee', logLevel=DEBUG]2. 按标题过滤邮件
您已经了解了基于 JMS 属性的消息过滤。现在,请看下面的示例以了解基于 Header 的消息过滤。ERROR级别日志或优先级在 5 到 9 之间的消息将传递给消费者。
package lab04.message.filtering;
import labxx.common.settings.CommonSettings;import org.junit.jupiter.api.Test;import javax.jms.*;import java.util.UUID;
public class MessageSelectorExample { private static ConnectionFactory connectionFactory = null; private static Queue defaultQueue = null; static { connectionFactory = CommonSettings.getConnectionFactory(); defaultQueue = CommonSettings.getDefaultQueue(); }
@Test public void messageFilterOnHeader() throws JMSException, InterruptedException { try (JMSContext jmsContext = connectionFactory.createContext()) { LogEvent event1 = new LogEvent("Sample ERROR Log", UUID.randomUUID().toString(), LogLevel.ERROR); LogEvent event2 = new LogEvent("Sample DEBUG Log", UUID.randomUUID().toString(), LogLevel.DEBUG); LogEvent event3 = new LogEvent("Sample INFO Log", UUID.randomUUID().toString(), LogLevel.INFO); LogEvent event4 = new LogEvent("Sample WARN Log", UUID.randomUUID().toString(), LogLevel.WARN); JMSConsumer consumer = jmsContext.createConsumer(defaultQueue, "logLevel = 'ERROR' OR JMSPriority BETWEEN 5 AND 9");
consumer.setMessageListener(msg -> { System.out.println(msg); try { LogEvent event = (LogEvent) ((ObjectMessage) msg).getObject(); System.out.println(event); } catch (JMSException e) { e.printStackTrace(); } });
JMSProducer producer = jmsContext.createProducer();
//send event1 ObjectMessage objectMessage = jmsContext.createObjectMessage(); objectMessage.setObject(event1); objectMessage.setStringProperty("logLevel", event1.getLogLevel().name()); producer.send(defaultQueue, objectMessage);
//Send event2 objectMessage = jmsContext.createObjectMessage(); objectMessage.setObject(event2); objectMessage.setStringProperty("logLevel", event2.getLogLevel().name()); producer.send(defaultQueue, objectMessage);
//Send event3 objectMessage = jmsContext.createObjectMessage(); objectMessage.setObject(event3); objectMessage.setStringProperty("logLevel", event3.getLogLevel().name()); producer.setPriority(5); producer.send(defaultQueue, objectMessage);
//Send event4 objectMessage = jmsContext.createObjectMessage(); objectMessage.setObject(event4); objectMessage.setStringProperty("logLevel", event4.getLogLevel().name());
//Reset to normal priority producer.setPriority(4); producer.send(defaultQueue, objectMessage); Thread.sleep(2000); consumer.close(); } }}输出
Class: com.sun.messaging.jmq.jmsclient.ObjectMessageImplgetJMSMessageID(): ID:12-192.168.0.197(b8:3d:5e:24:df:26)-53719-1577428536655getJMSTimestamp(): 1577428536655getJMSCorrelationID(): nullJMSReplyTo: nullJMSDestination: PTPQueuegetJMSDeliveryMode(): PERSISTENTgetJMSRedelivered(): falsegetJMSType(): nullgetJMSExpiration(): 0getJMSDeliveryTime(): 0getJMSPriority(): 4Properties: {JMSXDeliveryCount=1, logLevel=ERROR}LogEvent[id=2019-12-27T14:35:36.623, body='Sample ERROR Log',machineId='7d743936-aae8-44a2-a527-cd88236b6ea6', logLevel=ERROR]
Class: com.sun.messaging.jmq.jmsclient.ObjectMessageImplgetJMSMessageID(): ID:14-192.168.0.197(b8:3d:5e:24:df:26)-53719-1577428536661getJMSTimestamp(): 1577428536661getJMSCorrelationID(): nullJMSReplyTo: nullJMSDestination: PTPQueuegetJMSDeliveryMode(): PERSISTENTgetJMSRedelivered(): falsegetJMSType(): nullgetJMSExpiration(): 0getJMSDeliveryTime(): 0getJMSPriority(): 5Properties: {JMSXDeliveryCount=1, logLevel=INFO}LogEvent[id=2019-12-27T14:35:36.623, body='Sample INFO Log',machineId='01ce3812-1869-4d23-96d8-cfc2ea6267c2', logLevel=INFO]选择器规则
与任何查询语言一样,选择器由文字、标识符、表达式和运算符组成。我将在下面讨论每个规则以供参考。
1. 文字规则
- 文字
String用单引号引起来,单引号用两个单引号表示。例如,logLevel = 'ERROR',name='Ram''s'。 - 数值按原样显示,带有可选的
+或-符号。例如,range BETWEEN -200 AND +30。您也可以直接提及 ,30而不是+30。 - 数字也可以用科学计数法表示,例如等
-7E3。6.2 - Java
Long和Double也受支持。 - 布尔值可以表示为
TRUE和FALSE。
2. 标识符规则
- 适用于 Java 标识符的规则也适用于 JMS 选择器标识符。这意味着
_和$可用于标识符。 - 标识符不能包含 Java 关键字。
- 标识符不能具有诸如
NULL,TRUE,FALSE,NOT,AND,OR,BETWEEN,LIKE,IN,IS, 或者ESCAPE - 标识符区分大小写。
- 任何以 开头的名称
JMSX都是 JMS 定义的属性。 - 任何以 开头的名称
JMS_都是提供商特定的属性。 - 任何不包含 的名称(标识符)
JMS都是应用程序定义的属性。
3. 运营商规则
()此处适用包围的标准规则。- 逻辑运算符的优先顺序为:
NOT,AND,OR。 - 按优先顺序排列的比较运算符:
=,>,>=,<,<=,<>(不等于运算符)。 - 算术运算符的优先顺序如下:
- 一元运算符 –
+,-。 - 乘法和除法 –
*,/。 - 加减 -
+,-
- 一元运算符 –
您了解了 JMS 消息选择器及其用于过滤传递给客户端的消息的方式。它在实际应用中非常有用,特别是从性能角度来看。您的应用程序可以将大量处理负载委托给 JMS 提供程序,并确保只向其传递必要的消息。