[译]JMS 发布-订阅消息模型
在本文中,您将了解 JMS 发布-订阅 (publish-subscribe) 消息传递模型。正如您在JMS 简介文章中所读到的,在发布/订阅模型中,客户端通过称为主题的中介将消息发送给多个接收者。发送者通常称为发布者,接收者称为订阅者。
JMS 发布/订阅消息传递示例
下面是一个简单的代码示例,演示了发布/订阅消息模型的工作原理。我创建了 2 个主线程,publisher和subscriber1。将订阅者 1 克隆到subscriber2。所以基本上我有一个消息发布者和 2 个消息订阅者。链接至GitHub 。
package lab03.message.pubsub;
import labxx.common.settings.CommonSettings;import javax.jms.*;
public class SimplePubSubExample { private static ConnectionFactory connectionFactory = null; private static Topic defaultTopic = null; static { connectionFactory = CommonSettings.getConnectionFactory(); defaultTopic = CommonSettings.getDefautTopic(); }
public static void main(String[] args) { Thread publisher = new Thread(){ @Override public void run(){ try(JMSContext jmsContext = connectionFactory.createContext()) { Thread.sleep(1000); JMSProducer producer = jmsContext.createProducer(); TextMessage message = jmsContext.createTextMessage("World needs to worry about the Climate changes"); producer.send(defaultTopic, message); } catch (InterruptedException ex){ ex.printStackTrace(); } } };
Thread subscriber1 = new Thread(){ @Override public void run(){ try(JMSContext jmsContext = connectionFactory.createContext()) { JMSConsumer consumer = jmsContext.createConsumer(defaultTopic); System.out.println("Message received: " + consumer.receive().getBody(String.class)); } catch (JMSException e){ e.printStackTrace(); } } };
Thread subscriber2 = new Thread(subscriber1); publisher.start(); subscriber1.start(); subscriber2.start(); }}package labxx.common.settings;
import javax.jms.ConnectionFactory;import javax.jms.Queue;import javax.jms.Topic;import javax.naming.InitialContext;import javax.naming.NamingException;
public class CommonSettings { private static ConnectionFactory CONNECTION_FACTORY = null; private static Queue PTP_QUEUE = null; private static Topic PUB_SUB_TOPIC = null; private static Queue DEFAULT_REPLY_QUEUE = null;
static { try { InitialContext initialContext = new InitialContext(); CONNECTION_FACTORY = (ConnectionFactory) initialContext.lookup("jms/__defaultConnectionFactory"); PTP_QUEUE = (Queue) initialContext.lookup("jms/PTPQueue"); DEFAULT_REPLY_QUEUE = (Queue) initialContext.lookup("jms/ReplyQueue"); PUB_SUB_TOPIC = (Topic) initialContext.lookup("jms/PubSubTopic"); } catch (NamingException e) { e.printStackTrace(); } }
public static ConnectionFactory getConnectionFactory() { return CONNECTION_FACTORY; } public static Queue getDefaultQueue() { return PTP_QUEUE; } public static Queue getDefaultReplyQueue() { return DEFAULT_REPLY_QUEUE; } public static Topic getDefautTopic() { return PUB_SUB_TOPIC; }}输出
Message received: World needs to worry about the Climate changesMessage received: World needs to worry about the Climate changes注意: 所有发布/订阅系统通常都存在一定延迟,您编写的代码应该将延迟考虑在内。
普通消息订阅者的问题
普通消息订阅者(如上例中使用的订阅者)的问题在于,它不耐用。也就是说,如果消费者因某种原因宕机(关闭),则在恢复后它将无法接收先前的消息Topic。
让我们看看下面的代码,发布者发送了 7 条消息,而消费者只收到一条消息。一旦消费者关闭并重新创建,它就不会从主题中获取消息。链接至**GitHub**。
package lab03.message.pubsub;
import labxx.common.settings.CommonSettings;import javax.jms.*;
public class NormalConsumerProblem { private static ConnectionFactory connectionFactory = null; private static Topic defaultTopic = null;
static { connectionFactory = CommonSettings.getConnectionFactory(); defaultTopic = CommonSettings.getDefautTopic(); }
public static void main(String[] args) { Thread publisher = new Thread() { @Override public void run() { try (JMSContext jmsContext = connectionFactory.createContext()) { JMSProducer producer = jmsContext.createProducer(); Thread.sleep(1000); for (int i = 1; i < 7; i++) { producer.send(defaultTopic, "Update " + i); } } catch (InterruptedException e) { e.printStackTrace(); } } };
//Normal Consumer Thread consumer = new Thread() { @Override public void run() { try (JMSContext jmsContext = connectionFactory.createContext()) { JMSConsumer consumer = jmsContext.createConsumer(defaultTopic); System.out.println(consumer.receive().getBody(String.class)); Thread.sleep(2000); consumer.close(); consumer = jmsContext.createConsumer(defaultTopic); for (int i = 1; i < 6; i++) { System.out.println(consumer.receive().getBody(String.class)); } } catch (JMSException | InterruptedException e) { e.printStackTrace(); } } }; publisher.start(); consumer.start(); }}输出
Update 1在实际场景中,您需要一种更可靠的方式来订阅主题。JMS 可以DurableConsumer解决这个问题。
JMS 发布-订阅中的 DurableConsumer 示例
- 当必须接收某个主题的所有消息时,
DurableConsumer应使用持久订阅者()。 - JMS 确保在持久订阅者处于非活动状态时发布的消息由 JMS 保留,并在订阅者随后变为活动状态时进行传送。
- 当消费者在不活动时可以承受错过消息的后果时,将使用非持久订阅者。
下面的代码演示了 的用法DurableConsumer。与前面的示例不同,此处的消费者从 接收所有消息topic。链接至GitHub。
package lab03.message.pubsub;
import labxx.common.settings.CommonSettings;import javax.jms.*;
public class DurableConsumerExample { private static ConnectionFactory connectionFactory = null; private static Topic defaultTopic = null;
static { connectionFactory = CommonSettings.getConnectionFactory(); defaultTopic = CommonSettings.getDefautTopic(); }
public static void main(String[] args) { Thread publisher = new Thread() { @Override public void run() { try (JMSContext jmsContext = connectionFactory.createContext()) { JMSProducer producer = jmsContext.createProducer(); Thread.sleep(1000); for (int i = 1; i < 7; i++) { producer.send(defaultTopic, "Update " + i); } } catch (InterruptedException e) { e.printStackTrace(); } } };
//Durable Consumer Thread durableConsumer = new Thread() { @Override public void run() { try (JMSContext jmsContext = connectionFactory.createContext()) { jmsContext.setClientID("exampleApp");/important JMSConsumer consumer = jmsContext.createDurableConsumer(defaultTopic, "logConsumer"); System.out.println(consumer.receive().getBody(String.class)); Thread.sleep(2000); consumer.close(); consumer = jmsContext.createDurableConsumer(defaultTopic, "logConsumer"); for (int i = 1; i < 6; i++) { System.out.println(consumer.receive().getBody(String.class)); } } catch (JMSException | InterruptedException e) { e.printStackTrace(); } } };
publisher.start(); durableConsumer.start(); }}输出
Update 1Update 2Update 3Update 4Update 5Update 6注意: 设置 ClientID 很重要,这通常是订阅客户端应用程序的名称。另外,请记住设置名称,
durableConsumer如第 38 行和第 42 行所示。
SharedConsumer 进行负载平衡
在 JMS 2.0 中,多个订阅者可以监听一个主题,并且可以分配消息消费任务。有时,您可能需要对来自一个主题的多条消息进行负载平衡。这时您可以使用它来SharedConsumer分配消息消费负载。
下面的示例显示了创建sharedConsumer,请记住分配与第 37 行、第 38 行所示的相同的订阅者名称。代码链接至 GitHub。
package lab03.message.pubsub;
import labxx.common.settings.CommonSettings;import javax.jms.*;
public class SharedConsumerExample { private static ConnectionFactory connectionFactory = null; private static Topic defaultTopic = null;
static { connectionFactory = CommonSettings.getConnectionFactory(); defaultTopic = CommonSettings.getDefautTopic(); }
public static void main(String[] args) { Thread publisher = new Thread() { @Override public void run() { try (JMSContext jmsContext = connectionFactory.createContext()) { JMSProducer producer = jmsContext.createProducer(); Thread.sleep(1000); for (int i = 1; i < 7; i++) { producer.send(defaultTopic, "Update " + i); } } catch (InterruptedException e) { e.printStackTrace(); } } };
//Shared Consumer Thread sharedConsumer = new Thread() { @Override public void run() { try (JMSContext jmsContext = connectionFactory.createContext()) { JMSConsumer sharedConsumer1 = jmsContext.createSharedConsumer(defaultTopic, "sharedSubscriber"); JMSConsumer sharedConsumer2 = jmsContext.createSharedConsumer(defaultTopic, "sharedSubscriber"); for (int i = 0; i < 3; i++) { System.out.println("Shared Consumer1: " + sharedConsumer1.receive().getBody(String.class)); System.out.println("Shared Consumer2: " + sharedConsumer2.receive().getBody(String.class)); } Thread.sleep(3000); sharedConsumer1.close(); sharedConsumer2.close(); } catch (JMSException | InterruptedException e) { e.printStackTrace(); } } };
publisher.start(); sharedConsumer.start(); }}输出
Shared Consumer1: Update 1Shared Consumer2: Update 2Shared Consumer1: Update 3Shared Consumer2: Update 4Shared Consumer1: Update 5Shared Consumer2: Update 6此外,您还可以使用 ,SharedDurableConsumer它能为您提供DurableConsumer和 的综合能力SharedConsumer。
异步消息订阅
我在 JMS 点对点消息传递 文章中讨论了异步订阅。您可以使用相同的侦听器模式异步订阅主题。以下代码显示了异步消息订阅的一个简单示例。代码链接至GitHub。
package lab03.message.pubsub;
import labxx.common.settings.CommonSettings;import javax.jms.*;
public class AsyncPubSubExample { private static ConnectionFactory connectionFactory = null; private static Topic defaultTopic = null;
static { connectionFactory = CommonSettings.getConnectionFactory(); defaultTopic = CommonSettings.getDefautTopic(); }
public static void main(String[] args) throws InterruptedException { try (JMSContext jmsContext = connectionFactory.createContext()) { JMSProducer producer = jmsContext.createProducer(); JMSConsumer consumer = jmsContext.createConsumer(defaultTopic); consumer.setMessageListener(msg -> { try { System.out.println(msg.getBody(String.class)); } catch (JMSException e) { e.printStackTrace(); } }); for (int i = 1; i < 7; i++) { producer.send(defaultTopic, "Message " + i); } Thread.sleep(1000); consumer.close(); } }}输出
Message 1Message 2Message 3Message 4Message 5Message 6这些都是 JMS 发布-订阅消息模型的一部分,您已经清楚地了解了jms 发布/订阅中主题的使用。您了解了不同的消费者,例如 SharedConsumer、SharedDurableConsumer、DurableConsumer以及异步消息处理。请在下面的评论中分享您的想法。