[译]JMS 2.0 中的新增功能(第二部分)- 新的消息传送功能
原文链接:https://www.oracle.com/technical-resources/articles/java/jms2messaging.html
作者:Nigel Deakin 出版日期:2013 年 5 月
了解如何利用 JMS 2.0 中的新消息传递功能。
本文是两部分系列文章中的第二部分,介绍了 Java 消息服务 (JMS) 2.0 中引入的一些新消息传递功能。本文假设读者对 JMS 1.1 有基本的了解。
在第一部分中,我们了解了 JMS 2.0 中引入的新的易用特性。在这里,我们来了解一下重要的新消息传递特性。
JMS 2.0 于 2013 年 4 月发布,这是自 2002 年发布 1.1 版以来 JMS 规范的首次更新。人们可能会认为,一个长期保持不变的 API 已经变得毫无生机和无人使用。但是,如果根据不同实现的数量来判断 API 标准的成功,JMS 是最成功的 API 之一。
JMS 2.0 的重点是跟上近年来对其他企业 Java 技术所做的易用性改进。同时还借此机会引入了许多新的消息传递功能。
JMS 2.0 是 Java EE 7 平台的一部分,可用于 Java EE Web 或 EJB 应用程序。它也可以单独用于 Java SE 环境中。正如我下面所解释的那样,某些功能仅在独立环境中可用,而其他功能仅在 Java EE Web 或 EJB 应用程序中可用。
这里我们讨论 JMS 2.0 中的五个重要的新消息传递功能。
允许多个消费者订阅同一主题
在 JMS 1.1 中,一个主题的订阅不允许同时有多个使用者。这意味着,处理主题订阅上的消息的工作不能在多个线程、连接或 Java 虚拟机 (JVM) 之间共享,从而限制了应用程序的可伸缩性。JMS 2.0 中引入了一种称为共享订阅的新主题订阅,从而消除了此限制。
让我们回顾一下主题订阅在 JMS 1.1 中的工作方式。在清单 1 中,方法createConsumer
onSession
用于创建指定主题的非持久订阅(我们稍后将讨论持久订阅):
private void createUnsharedConsumer(ConnectionFactory connectionFactory, Topic topic)
throws JMSException {
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(topic);
connection.start();
Message message = messageConsumer.receive(10000);
while (message != null) {
System.out.println("Message received: " + ((TextMessage) message).getText());
message = messageConsumer.receive(10000);
}
connection.close();
}
清单 1
在清单 1 中,消费者将收到发送到该主题的每条消息的副本。但是,如果应用程序处理每条消息需要很长时间,该怎么办?我们如何通过在两个 JVM 之间分担处理这些消息的工作(例如,一个 JVM 处理一些消息,另一个 JVM 处理剩余消息)来提高应用程序的可扩展性?
在 JMS 1.1 中,无法在普通 Java SE 应用程序中执行此操作。(在 Java EE 中,您可以使用消息驱动 bean [MDB] 池来执行此操作)。如果您createConsumer
在单独的 JVM(或同一 JVM 上的单独线程)中创建第二个使用者,则每个使用者将使用单独的订阅,因此它将收到主题收到的每条消息的副本。这不是我们想要的。如果您将“订阅”视为接收发送到主题的每条消息的副本的逻辑实体,那么我们希望两个使用者使用相同的订阅。
JMS 2.0 提供了一种解决方案。您可以使用新方法创建“共享”非持久订阅:createSharedConsumer
。此方法在 上Session
(对于使用传统 API 的应用程序)和 上JMSContext
(对于使用简化 API 的应用程序)都可用。由于两个 JVM 需要能够识别它们需要共享的订阅,因此它们需要提供一个名称来标识共享订阅,如清单 2 所示。
private void createSharedConsumer(ConnectionFactory connectionFactory, Topic topic) throws JMSException {
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createSharedConsumer(topic,"mySubscription");
connection.start();
Message message = messageConsumer.receive(10000);
while (message != null) {
System.out.println("Message received: " + ((TextMessage) message).getText());
message = messageConsumer.receive(10000);
}
connection.close();
}
清单 2
如果您在两个单独的 JVM 中运行清单 2 中的代码,则发送到主题的每条消息都将传递给两个消费者中的一个。这允许他们共享处理来自订阅的消息的工作。
createDurableSubscriber
使用持久订阅的应用程序也可以使用相同的功能。在 JMS 1.1 中,使用on方法创建持久订阅Session
:
MessageConsumer messageConsumer = session.createDurableSubscription(topic,"myDurableSub");
这将创建一个myDurableSub
针对指定主题的持久订阅。但是,与以前一样,无法在两个 JVM 之间或同一 JVM 上的两个线程之间共享处理此持久订阅上的消息的工作。根据您尝试执行的操作,您将获得一个JMSException
或两个不同的订阅。
JMS 2.0 再次为这个问题提供了解决方案。现在,您可以使用新方法创建“共享”持久订阅。此方法在 上(对于使用传统 API 的应用程序)和 上(对于使用简化 API 的应用程序)createSharedDurableConsumer
都可用。Session``JMSContext
MessageConsumer messageConsumer = session.createSharedDurableConsumer(topic,"myDurableSub");
总而言之,JMS 1.1 定义了两种不同类型的主题订阅,而 JMS 2.0 定义了四种类型,所有这些类型都可以使用经典或简化的 API 来创建:
- 非共享非持久订阅。这些订阅在 JMS 1.1 和 JMS 2.0 中均可用,并且使用 创建
createConsumer
。它们只能有一个消费者。设置客户端标识符是可选的。 - 非共享持久订阅。这些订阅在 JMS 1.1 和 JMS 2.0 中均可用,使用或创建
createDurableSubscriber
(仅在 JMS 2.0 中)createDurableConsumer
。它们只能有一个消费者。必须设置客户端标识符,订阅由订阅名称和客户端标识符的组合来标识。 - 共享非持久订阅。这些订阅仅在 JMS 2.0 中可用,并且使用 创建
createSharedConsumer
。它们可以有任意数量的消费者。设置客户端标识符是可选的。订阅由订阅名称和客户端标识符(如果已设置)的组合标识。 - 共享持久订阅。这些仅在 JMS 2.0 中可用,并且使用 创建
createSharedDurableConsumer
。它们可以有任意数量的消费者。设置客户端标识符是可选的。订阅由订阅名称和客户端标识符(如果已设置)的组合标识。
配送延迟
您现在可以指定消息的传送延迟。JMS 提供程序将在指定的传送延迟过去后才传送消息。
如果您使用的是经典 API,则需要在发送消息之前通过调用来设置传送延迟(以毫秒为单位),如清单 3 所示setDeliveryDelay
。MessageProducer
private void sendWithDeliveryDelayClassic(ConnectionFactory connectionFactory,Queue queue)
throws JMSException {
// send a message with a delivery delay of 20 seconds
try (Connection connection = connectionFactory.createConnection();){
Session session = con.createSession();
MessageProducer messageProducer = session.createProducer(queue);
messageProducer.setDeliveryDelay(20000);
TextMessage textMessage = session.createTextMessage("Hello world");
messageProducer.send(textMessage);
}
}
清单 3
如果您使用的是简化 API,则需要setDeliveryDelay
在JMSProducer
发送消息之前调用。此方法返回对象JMSProducer
,它允许您在同一行中创建 JMSProducer、设置交付延迟和发送消息,如清单 4 所示。
private void sendWithDeliveryDelaySimplified(ConnectionFactory connectionFactory,Queue queue)
throws JMSException {
// send a message with a delivery delay of 20 seconds
try (JMSContext context = connectionFactory.createContext();){
context.createProducer().setDeliveryDelay(20000).send(queue,"Hello world");
}
}
清单 4
异步发送消息
JMS 2.0 的另一个新特性是能够异步发送消息。
此功能适用于在 Java SE 或 Java EE 应用程序客户端容器中运行的应用程序。不适用于在 Java EE Web 或 EJB 容器中运行的应用程序。
通常,发送持久消息时,send 方法直到 JMS 客户端将消息发送到服务器并收到回复以通知客户端该消息已被安全接收并持久化后才会返回。我们称此为同步发送。
JMS 2.0 引入了执行异步发送的功能。当异步发送消息时,发送方法会将消息发送到服务器,然后将控制权返回给应用程序,而无需等待服务器的回复。应用程序可以做一些有用的事情,例如发送另一条消息或执行某些处理,而不是在 JMS 客户端等待回复时被无谓地阻塞。
当收到服务器的回复以表明消息已被服务器接收并持久保存时,JMS 提供程序将通过调用onCompletion
应用程序指定的CompletionListener
对象上的回调方法来通知应用程序。
在应用程序中使用异步发送的主要方式有两种
- 允许应用程序在等待服务器回复的时间间隔内执行其他操作(如更新显示或写入数据库)
- 允许连续发送大量消息,而无需在每条消息后等待服务器的回复
清单 5 是一个示例,说明如何使用经典 API 实现其中的第一个:
private void asyncSendClassic(ConnectionFactory connectionFactory,Queue queue)
throws Exception {
// send a message asynchronously
try (Connection connection = connectionFactory.createConnection();){
Session session = connection.createSession();
MessageProducer messageProducer = session.createProducer(queue);
TextMessage textMessage = session.createTextMessage("Hello world");
CountDownLatch latch = new CountDownLatch(1);
MyCompletionListener myCompletionListener = new MyCompletionListener(latch);
messageProducer.send(textMessage,new MyCompletionListener(latch));
System.out.println("Message sent, now waiting for reply");
// at this point we can do something else before waiting for the reply
// this is not shown here
// now wait for the reply from the server
latch.await();
if (myCompletionListener.getException()==null){
System.out.println("Reply received from server");
} else {
throw myCompletionListener.getException();
}
}
}
清单 5
清单 5 中使用的类MyCompletionListener
是应用程序提供的单独类,它实现了接口javax.jms.CompletionListener
,如清单 6 所示:
class MyCompletionListener implements CompletionListener {
CountDownLatch latch;
Exception exception;
public MyCompletionListener(CountDownLatch latch) {
this.latch=latch;
}
@Override
public void onCompletion(Message message) {
latch.countDown();
}
@Override
public void onException(Message message, Exception exception) {
latch.countDown();
this.exception=exception;
}
public Exception getException(){
return exception;
}
}
清单 6
在清单 6 中,我们使用一种新方法MessageProducer
发送消息,而无需等待服务器的回复。这就是send(Message message, CompletionListener listener)
。使用此方法发送消息允许应用程序在服务器处理消息时执行其他操作。当应用程序准备继续时,它使用java.util.concurrent.CountDownLatch
等待,直到收到来自服务器的回复。收到回复后,应用程序可以继续执行,并且对消息已成功发送的信心程度与正常同步发送后相同。
如果使用 JMS 2.0 简化 API,异步发送消息会稍微简单一些,如清单 7 所示:
private void asyncSendSimplified(ConnectionFactory connectionFactory,Queue queue)
throws Exception {
// send a message asynchronously
try (JMSContext context = connectionFactory.createContext();){
CountDownLatch latch = new CountDownLatch(1);
MyCompletionListener myCompletionListener = new MyCompletionListener(latch);
context.createProducer().setAsync(myCompletionListener).send(queue,"Hello world");
System.out.println("Message sent, now waiting for reply");
// at this point we can do something else before waiting for the reply
// this is not shown here
latch.await();
if (myCompletionListener.getException()==null){
System.out.println("Reply received from server");
} else {
throw myCompletionListener.getException();
}
}
}
清单 7
在这种情况下,在调用之前setAsync(CompletionListener listener)
调用该方法。由于支持方法链,您可以在同一行上执行这两项操作。JMSProducer``send(Message message)``JMSProducer
JMSXDeliveryCount
JMS 2.0 允许接收消息的应用程序确定该消息被重新传送的次数。可以从消息属性中获取此信息JMSXDeliveryCount
:
int deliveryCount = message.getIntProperty("JMSXDeliveryCount");
JMSXDeliveryCount
不是一个新属性;它是在 JMS 1.1 中定义的。然而,在 JMS 1.1 中,JMS 提供程序可以选择是否实际设置它,这意味着使用它的应用程序代码不可移植。在 JMS 2.0 中,JMS 提供程序必须设置此属性,以便可移植应用程序能够使用它。
那么为什么应用程序想要知道消息被重新传递了多少次?
如果重新传送消息,则意味着之前传送该消息的尝试由于某种原因失败。如果重复重新传送消息,则原因可能是接收应用程序出现问题。应用程序可能能够接收消息但无法处理它,因此会抛出异常或回滚事务。如果由于某种原因导致消息无法处理,例如消息在某种程度上“不好”,则同一条消息将被反复重新传送,从而浪费资源并阻止处理后续“好”消息。
此JMSXDeliveryCount
属性允许消费应用程序检测某条消息是否已被多次重新传送,因此在某种程度上属于“坏消息”。应用程序可以使用此信息采取一些特殊操作(而不是简单地触发另一次重新传送),例如消费该消息并将其发送到单独的“坏消息”队列以供管理员操作。
一些 JMS 提供商已经提供了非标准功能来检测重复重新传送的消息并将其转移到死消息队列。虽然 JMS 2.0 定义了应如何处理此类消息,但该JMSXDeliveryCount
属性允许应用程序以可移植的方式实现自己的“坏”消息处理代码。
清单 8 显示了MessageListener
抛出RuntimeException
模拟处理“坏”消息时出错的 。MessageListener
使用该JMSXDeliveryCount
属性来检测消息是否已被重新传递十次并采取不同的操作。
class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
int deliveryCount = message.getIntProperty("JMSXDeliveryCount");
if (deliveryCount<10){
// now throw a RuntimeException
// to simulate a problem processing the message
// the message will then be redelivered
throw new RuntimeException("Exception thrown to simulate a bad message");
} else {
// message has been redelivered ten times,
// let's do something to prevent endless redeliveries
// such as sending it to dead message queue
// details omitted
}
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
}
清单 8
MDB 配置属性
需要异步接收消息的 Java EE 应用程序使用 MDB 来实现,MDB 是通过指定多个配置属性来配置的。
Java EE 的早期版本对于如何配置 MDB 的规定非常模糊。在 EJB 3.1 中,仅定义了以下配置属性:
acknowledgeMode
(仅当事务由 bean 管理时使用;可以设置为Auto-acknowledge
或Dups-ok-acknowledg
e)messageSelector
destinationType
(可以设置为javax.jms.Queue
或javax.jms.Topic
)subscriptionDurability
(仅用于主题;可以设置为Durable
或NonDurable
)
但是,EJB 3.1 并未定义应用程序应如何指定 MDB 从哪个队列或主题接收消息。这留给应用程序服务器或资源适配器来定义一种非标准方法来实现。
EJB 3.1 也没有定义当从主题接收消息并subscriptionDurability
设置属性Durable
时应如何指定订阅名称和客户端标识符。并且 EJB 3.1 中没有标准方法来指定 MDB 用于创建与 JMS 服务器的连接的连接工厂。
这些相当令人惊讶的限制都在最新版本的 Java EE 中得到了解决。EJB 3.2(Java EE 7 的一部分)定义了以下附加配置属性:
destinationLookup
:管理定义的队列或主题对象的 JNDI 名称,该对象表示 MDB 将从中接收消息的队列或主题connectionFactoryLookup``ConnectionFactory
: MDB 将用来连接到 JMS 提供程序的管理定义对象的 JNDI 名称clientId
:MDB 连接到 JMS 提供程序时使用的客户端标识符subscriptionName``subscriptionDurability
:设置为时使用的持久订阅名称Durable
无论如何,大多数应用服务器都支持clientId
,subscriptionName
因此将这些定义为标准只是将现有实践形式化。
当然,始终可以配置 JMS MDB 使用的队列或主题,并且许多(但不是全部)应用服务器都提供了指定连接工厂的方法。但是,这样做的方式是非标准的,并且因应用服务器而异。应用服务器仍然可以继续支持这些非标准机制。但是,您可以确信使用多个应用服务器的应用程序destinationLookup
将connectionFactoryLookup
与多个应用服务器一起工作。
清单 9 展示了一个 JMS MDB,它使用来自某个主题的持久订阅的消息并使用新的标准属性:
@MessageDriven(activationConfig = {
@ActivationConfigProperty(
propertyName = "connectionFactoryLookup", propertyValue = "jms/MyConnectionFactory"),
@ActivationConfigProperty(
propertyName = "destinationLookup", propertyValue = "jmq/PriceFeed"),
@ActivationConfigProperty(
propertyName = "destinationType ", propertyValue = "javax.jms.Topic "),
@ActivationConfigProperty(
propertyName = "subscriptionDurability ", propertyValue = "Durable"),
@ActivationConfigProperty(
propertyName = "subscriptionName", propertyValue = "MySub"),
@ActivationConfigProperty(
propertyName = "clientId", propertyValue = "MyClientId") })
public class MyMDB implements MessageListener {
public void onMessage(Message message) {
...
清单 9
结论
上述五项特性使 Java 开发人员能够更轻松地进行消息传递。结合第一部分中讨论的易用性特性,它们代表着 JMS 2.0 向前迈出了一大步 — JMS 2.0 将继续蓬勃发展,成为 Java 领域最成功的 API 之一。
也可以看看
关于作者
Nigel Deakin 是 Oracle 的首席技术人员,曾担任 JSR 343(Java 消息服务 2.0)的规范负责人。除了负责领导 JMS 规范的下一版本之外,他还是 Oracle JMS 开发团队的成员,致力于 Open Message Queue 和 GlassFish 应用服务器。他最近在美国旧金山的 JavaOne 和比利时安特卫普的 Devoxx 上发表过演讲,目前居住在英国剑桥。
加入对话
在Facebook、Twitter和Oracle Java 博客上加入 Java 社区讨论!