[译]JMS 消息选择器在过滤消息中的应用

发送给接收方的消息通常需要根据某些标准进行过滤。JMS 提供消息选择器,允许 JMS 消费者根据消息头指定其感兴趣的消息。在本文中,您将学习使用 JMS 消息选择器来过滤消息。

这是一篇高级教程,在继续阅读本文之前,请确保您对 JMS 有基本的了解。如果您是 JMS 新手,请查看JMS 点对点消息传递模型文章。

为什么要使用消息选择器?

当一条消息被广播给许多接收者时,将标准放入订阅中以注册特定的兴趣是很有用的。这样只有感兴趣的消息才会被传递给订阅者。

1. 按属性过滤消息

就像我之前提到的,你只能根据属性或标头过滤消息,而不能根据实际消息内容过滤消息。下面的示例显示,只有级别日志被传递给消费者。链接至GitHub 代码库

package lab04.message.filtering;
import labxx.common.settings.CommonSettings;
import org.junit.jupiter.api.Test;
import javax.jms.*;
import java.util.UUID;
public class MessageSelectorExample {
private static ConnectionFactory connectionFactory = null;
private static Queue defaultQueue = null;
static {
connectionFactory = CommonSettings.getConnectionFactory();
defaultQueue = CommonSettings.getDefaultQueue();
}
@Test
public void messageFilterOnProperties() throws JMSException, InterruptedException {
try (JMSContext jmsContext = connectionFactory.createContext()) {
LogEvent event1 = new LogEvent("Sample ERROR Log", UUID.randomUUID().toString(), LogLevel.ERROR);
LogEvent event2 = new LogEvent("Sample DEBUG Log", UUID.randomUUID().toString(), LogLevel.DEBUG);
LogEvent event3 = new LogEvent("Sample INFO Log", UUID.randomUUID().toString(), LogLevel.INFO);
LogEvent event4 = new LogEvent("Sample WARN Log", UUID.randomUUID().toString(), LogLevel.WARN);
//NOTE - If you keep "logLevel = DEBUG", it will not work!
JMSConsumer consumer = jmsContext.createConsumer(defaultQueue, "logLevel = 'DEBUG'");
consumer.setMessageListener(msg -> {
System.out.println(msg);
try {
LogEvent event = (LogEvent) ((ObjectMessage) msg).getObject();
System.out.println(event);
} catch (JMSException e) {
e.printStackTrace();
}
});
JMSProducer producer = jmsContext.createProducer();
//send event1
ObjectMessage objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event1);
objectMessage.setStringProperty("logLevel", event1.getLogLevel().name());
producer.send(defaultQueue, objectMessage);
//Send event2
objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event2);
objectMessage.setStringProperty("logLevel", event2.getLogLevel().name());
producer.send(defaultQueue, objectMessage);
//Send event3
objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event3);
objectMessage.setStringProperty("logLevel", event3.getLogLevel().name());
producer.send(defaultQueue, objectMessage);
//Send event4
objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event4);
objectMessage.setStringProperty("logLevel", event4.getLogLevel().name());
producer.send(defaultQueue, objectMessage);
Thread.sleep(2000);
consumer.close();
}
}
}
package lab04.message.filtering;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.StringJoiner;
public class LogEvent implements Serializable {
//Timestamp is the primary Key
private LocalDateTime id;
//Log Body
private String body;
//Unique Server Id
private String machineId;
private LogLevel logLevel;
public LogEvent() {
super();
}
public LogEvent(String body, String machineId, LogLevel logLevel) {
this.id = LocalDateTime.now();
this.body = body;
this.machineId = machineId;
this.logLevel = logLevel;
}
public LocalDateTime getId() {
return id;
}
public void setId(LocalDateTime id) {
this.id = id;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
public String getMachineId() {
return machineId;
}
public void setMachineId(String machineId) {
this.machineId = machineId;
}
public LogLevel getLogLevel() {
return logLevel;
}
public void setLogLevel(LogLevel logLevel) {
this.logLevel = logLevel;
}
@Override
public String toString() {
return new StringJoiner(", ", LogEvent.class.getSimpleName() + "[", "]")
.add("id=" + id)
.add("body='" + body + "'")
.add("machineId='" + machineId + "'")
.add("logLevel=" + logLevel)
.toString();
}
}
package lab04.message.filtering;
public enum LogLevel {
TRACE, DEBUG, INFO, WARN, ERROR
}

输出

Class: com.sun.messaging.jmq.jmsclient.ObjectMessageImpl
getJMSMessageID(): ID:13-192.168.0.197(98:ca:f3:fc:e3:8f)-51690-1577423107544
getJMSTimestamp(): 1577423107544
getJMSCorrelationID(): null
JMSReplyTo: null
JMSDestination: PTPQueue
getJMSDeliveryMode(): PERSISTENT
getJMSRedelivered(): false
getJMSType(): null
getJMSExpiration(): 0
getJMSDeliveryTime(): 0
getJMSPriority(): 4
Properties: {JMSXDeliveryCount=1, logLevel=DEBUG}
LogEvent[id=2019-12-27T13:05:07.512, body='Sample DEBUG Log',
machineId='08b7c29b-0934-4f3d-8ddf-c406ea795dee', logLevel=DEBUG]

2. 按标题过滤邮件

您已经了解了基于 JMS 属性的消息过滤。现在,请看下面的示例以了解基于 Header 的消息过滤。ERROR级别日志或优先级在 5 到 9 之间的消息将传递给消费者。

package lab04.message.filtering;
import labxx.common.settings.CommonSettings;
import org.junit.jupiter.api.Test;
import javax.jms.*;
import java.util.UUID;
public class MessageSelectorExample {
private static ConnectionFactory connectionFactory = null;
private static Queue defaultQueue = null;
static {
connectionFactory = CommonSettings.getConnectionFactory();
defaultQueue = CommonSettings.getDefaultQueue();
}
@Test
public void messageFilterOnHeader() throws JMSException, InterruptedException {
try (JMSContext jmsContext = connectionFactory.createContext()) {
LogEvent event1 = new LogEvent("Sample ERROR Log", UUID.randomUUID().toString(), LogLevel.ERROR);
LogEvent event2 = new LogEvent("Sample DEBUG Log", UUID.randomUUID().toString(), LogLevel.DEBUG);
LogEvent event3 = new LogEvent("Sample INFO Log", UUID.randomUUID().toString(), LogLevel.INFO);
LogEvent event4 = new LogEvent("Sample WARN Log", UUID.randomUUID().toString(), LogLevel.WARN);
JMSConsumer consumer = jmsContext.createConsumer(defaultQueue, "logLevel = 'ERROR' OR JMSPriority BETWEEN 5 AND 9");
consumer.setMessageListener(msg -> {
System.out.println(msg);
try {
LogEvent event = (LogEvent) ((ObjectMessage) msg).getObject();
System.out.println(event);
} catch (JMSException e) {
e.printStackTrace();
}
});
JMSProducer producer = jmsContext.createProducer();
//send event1
ObjectMessage objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event1);
objectMessage.setStringProperty("logLevel", event1.getLogLevel().name());
producer.send(defaultQueue, objectMessage);
//Send event2
objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event2);
objectMessage.setStringProperty("logLevel", event2.getLogLevel().name());
producer.send(defaultQueue, objectMessage);
//Send event3
objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event3);
objectMessage.setStringProperty("logLevel", event3.getLogLevel().name());
producer.setPriority(5);
producer.send(defaultQueue, objectMessage);
//Send event4
objectMessage = jmsContext.createObjectMessage();
objectMessage.setObject(event4);
objectMessage.setStringProperty("logLevel", event4.getLogLevel().name());
//Reset to normal priority
producer.setPriority(4);
producer.send(defaultQueue, objectMessage);
Thread.sleep(2000);
consumer.close();
}
}
}

输出

Class: com.sun.messaging.jmq.jmsclient.ObjectMessageImpl
getJMSMessageID(): ID:12-192.168.0.197(b8:3d:5e:24:df:26)-53719-1577428536655
getJMSTimestamp(): 1577428536655
getJMSCorrelationID(): null
JMSReplyTo: null
JMSDestination: PTPQueue
getJMSDeliveryMode(): PERSISTENT
getJMSRedelivered(): false
getJMSType(): null
getJMSExpiration(): 0
getJMSDeliveryTime(): 0
getJMSPriority(): 4
Properties: {JMSXDeliveryCount=1, logLevel=ERROR}
LogEvent[id=2019-12-27T14:35:36.623, body='Sample ERROR Log',
machineId='7d743936-aae8-44a2-a527-cd88236b6ea6', logLevel=ERROR]
Class: com.sun.messaging.jmq.jmsclient.ObjectMessageImpl
getJMSMessageID(): ID:14-192.168.0.197(b8:3d:5e:24:df:26)-53719-1577428536661
getJMSTimestamp(): 1577428536661
getJMSCorrelationID(): null
JMSReplyTo: null
JMSDestination: PTPQueue
getJMSDeliveryMode(): PERSISTENT
getJMSRedelivered(): false
getJMSType(): null
getJMSExpiration(): 0
getJMSDeliveryTime(): 0
getJMSPriority(): 5
Properties: {JMSXDeliveryCount=1, logLevel=INFO}
LogEvent[id=2019-12-27T14:35:36.623, body='Sample INFO Log',
machineId='01ce3812-1869-4d23-96d8-cfc2ea6267c2', logLevel=INFO]

选择器规则

与任何查询语言一样,选择器由文字、标识符、表达式和运算符组成。我将在下面讨论每个规则以供参考。

1. 文字规则

2. 标识符规则

3. 运营商规则

您了解了 JMS 消息选择器及其用于过滤传递给客户端的消息的方式。它在实际应用中非常有用,特别是从性能角度来看。您的应用程序可以将大量处理负载委托给 JMS 提供程序,并确保只向其传递必要的消息。

原文链接: JMS Message Selectors in Action to Filter Messages

[译]JMS 消息模型
[译]JMS 点对点消息传递的实际应用