ActiveMQ安装和使用

ActiveMQ 是一个实现了 JMS 协议的开源消息中间件,目前它有两种版本:ActiveMQ “Classic” 和 ActiveMQ Artemis。

Artemis 版本作为Classic版本的替代品而诞生,相较的提升点有:

ActiveMQ Classic 各个版本的比较,参考:https://activemq.apache.org/components/classic/download/

本文以 ActiveMQ Classic 5.16.7 进行测试。

压缩包安装

在 linux 服务器上下载和安装:

wget https://archive.apache.org/dist/activemq/5.16.7/apache-activemq-5.16.7-bin.tar.gz
tar -zxvf apache-activemq-5.16.7-bin.tar.gz
cd apache-activemq-5.16.7

启动:

$ bin/activemq start
INFO: Loading '/data/apache-activemq-5.16.7/bin/env'
INFO: Using java '/usr/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/data/apache-activemq-5.16.7/data/activemq.pid' (pid '31850')

查看是否启动成功:

$ netstat -antpule|grep 61616
tcp6 0 0 :::61616 :::* LISTEN 0 382969263 31850/java

查看日志:

tailf data/activemq.log

打开浏览器,访问 http://127.0.0.1:8161/admin ,默认账号:admin,密码:admin

修改配置,支持通过 IP 访问,修改 conf 目录下的 jetty.xml 文件,将 127.0.0.1 改为 0.0.0.0 ,然后重启。

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="127.0.0.1"/>
<property name="port" value="8161"/>
</bean>

创建 systemd 服务,创建 /usr/lib/systemd/system/activemq.service 文件:

[Unit]
Description=ActiveMQ service
After=network.target
[Service]
Type=forking
ExecStart=/opt/activemq/bin/activemq start
ExecStop=/opt/activemq/bin/activemq stop
User=root
Group=root
Restart=always
RestartSec=9
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=activemq
[Install]
WantedBy=multi-user.target

注意 ExecStart 和 ExecStop 中 activemq 的安装路径,我这里是安装在 /opt 目录。

通过systemctl管理activemq

#启动activemq服务:
systemctl start activemq
#查看服务状态:
systemctl status activemq
#创建软件链接:
ln -s /usr/lib/systemd/system/activemq.service /etc/systemd/system/multi-user.target.wants/activemq.service
#开机自启:
systemctl enable activemq
#检测是否开启成功(enable):
systemctl list-unit-files |grep activemq

关闭防火墙:

systemctl stop firewalld.service

或者开放端口,重启防火墙:

irewall-cmd --zone=public --add-port=8161/tcp --permanent
firewall-cmd --zone=public --add-port=61616/tcp --permanent
systemctl restart firewalld.service

使用 Docker 安装

docker run -d --name activemq -p 61616:61616 -p 8161:8161 apache/activemq-classic:5.16.7

使用 Docker Compose 安装

docker-compose.yml:

services:
activemq:
image: apache/activemq-classic:5.16.7
environment:
- "TZ=Asia/Shanghai"
volumes:
- activemq_data://opt/apache-activemq/data
ports:
- "61616:61616"
- "8161:8161"
volumes:
activemq_data:

安装:

docker-compose -f docker-compose.yml up -d

查看容器:

$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
49166db84713 apache/activemq-classic "/usr/local/bin/entr…" 10 minutes ago Up 4 minutes 1099/tcp, 1883/tcp, 5672/tcp, 0.0.0.0:8161->8161/tcp, :::8161->8161/tcp, 61613-61614/tcp, 0.0.0.0:61616->61616/tcp, :::61616->61616/tcp messaging-examples-activemq-1

查看日志:

docker logs -f messaging-examples-activemq-1

发送、接收消息

以下是发送消息的示例:

$ bin/activemq producer --message "hello" --messageCount 10
INFO | Connecting to URL: failover://tcp://localhost:61616 as user: null
INFO | Producing messages to queue://TEST
INFO | Using persistent messages
INFO | Sleeping between sends 0 ms
INFO | Running 1 parallel threads
INFO | Successfully connected to tcp://localhost:61616
INFO | producer-1 Started to calculate elapsed time ...
INFO | producer-1 Produced: 10 messages
INFO | producer-1 Elapsed time in second : 0 s
INFO | producer-1 Elapsed time in milli second : 25 milli seconds

http://127.0.0.1:8161/admin/queues.jsp 可以看到,TEST 队列有 10 个消息。

接收消息:

$ bin/activemq consumer
INFO | Connecting to URL: failover://tcp://localhost:61616 as user: null
INFO | Consuming queue://TEST
INFO | Sleeping between receives 0 ms
INFO | Running 1 parallel threads
INFO | Successfully connected to tcp://localhost:61616
INFO | consumer-1 wait until 1000 messages are consumed
INFO | consumer-1 Received hello

bin/activemq producer 支持的参数:

$ bin/activemq producer -h
Usage: producer [OPTIONS]
Description: Demo producer that can be used to send messages to the broker
Options :
[--brokerUrl URL] - connection factory url; default ActiveMQConnectionFactory.DEFAULT_BROKER_URL
[--user ..] - connection user name
[--password ..] - connection password
[--destination queue://..|topic://..] - producer destination; default queue://TEST
[--persistent true|false] - use persistent or non persistent messages; default true
[--messageCount N] - number of messages to send; default 1000
[--sleep N] - millisecond sleep period between sends or receives; default 0
[--transactionBatchSize N] - use send transaction batches of size N; default 0, no jms transactions
[--parallelThreads N] - number of threads to run in parallel; default 1
[--msgTTL N] - message TTL in milliseconds
[--messageSize N] - size in bytes of a BytesMessage; default 0, a simple TextMessage is used
[--textMessageSize N] - size in bytes of a TextMessage, a Lorem ipsum demo TextMessage is used
[--message ..] - a text string to use as the message body
[--payloadUrl URL] - a url pointing to a document to use as the message body
[--msgGroupID ..] - JMS message group identifier

bin/activemq consumer 的参数:

$ bin/activemq consumer -h
Usage: consumer [OPTIONS]
Description: Demo consumer that can be used to receive messages to the broker
Options :
[--brokerUrl URL] - connection factory url; default ActiveMQConnectionFactory.DEFAULT_BROKER_URL
[--user ..] - connection user name
[--password ..] - connection password
[--destination queue://..|topic://..] - consumer destination; default queue://TEST
[--messageCount N] - number of messages to send; default 1000
[--sleep N] - millisecond sleep period between sends or receives; default 0
[--ackMode AUTO_ACKNOWLEDGE|CLIENT_ACKNOWLEDGE] - the type of message acknowledgement to use; default auto acknowledge
[--batchSize N] - batch size for transactions and client acknowledgment (default 10)
[--durable true|false] - create durable topic
[--clientId ..] - connection client id; must be set for durable topics
[--parallelThreads N] - number of threads to run in parallel; default 1
[--bytesAsText true|false] - try to treat a BytesMessage as a text string

配置文件

activemq.xml:

<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<!-- Allows accessing the server log -->
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
http://activemq.apache.org/slow-consumer-handling.html
-->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<!--
The systemUsage controls the maximum amount of space the broker will
use before disabling caching and/or slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<!-- destroy the spring context on shutdown to stop jetty -->
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
<!--
Enable web consoles, REST and Ajax APIs and demos
The web consoles requires by default login, you can disable this in the jetty.xml file
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>
</beans>

users.properties:定义 web 控制台的用户名和密码

admin=admin

jetty.xml:Jetty服务器的配置文件,用于控制Web服务器的行为

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="securityLoginService" class="org.eclipse.jetty.security.HashLoginService">
<property name="name" value="ActiveMQRealm" />
<property name="config" value="${activemq.conf}/jetty-realm.properties" />
</bean>
<bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
<property name="name" value="BASIC" />
<property name="roles" value="user,admin" />
<!-- set authenticate=false to disable login -->
<property name="authenticate" value="true" />
</bean>
<bean id="adminSecurityConstraint" class="org.eclipse.jetty.util.security.Constraint">
<property name="name" value="BASIC" />
<property name="roles" value="admin" />
<!-- set authenticate=false to disable login -->
<property name="authenticate" value="true" />
</bean>
<bean id="securityConstraintMapping" class="org.eclipse.jetty.security.ConstraintMapping">
<property name="constraint" ref="securityConstraint" />
<property name="pathSpec" value="/,/api/*,*.jsp,*.html,*.js,*.css,*.png,*.gif,*.ico" />
</bean>
<bean id="adminSecurityConstraintMapping" class="org.eclipse.jetty.security.ConstraintMapping">
<property name="constraint" ref="adminSecurityConstraint" />
<property name="pathSpec" value="*.action" />
</bean>
<bean id="rewriteHandler" class="org.eclipse.jetty.rewrite.handler.RewriteHandler">
<property name="rules">
<list>
<bean id="header" class="org.eclipse.jetty.rewrite.handler.HeaderPatternRule">
<property name="pattern" value="*"/>
<property name="name" value="X-FRAME-OPTIONS"/>
<property name="value" value="SAMEORIGIN"/>
</bean>
<bean id="header" class="org.eclipse.jetty.rewrite.handler.HeaderPatternRule">
<property name="pattern" value="*"/>
<property name="name" value="X-XSS-Protection"/>
<property name="value" value="1; mode=block"/>
</bean>
<bean id="header" class="org.eclipse.jetty.rewrite.handler.HeaderPatternRule">
<property name="pattern" value="*"/>
<property name="name" value="X-Content-Type-Options"/>
<property name="value" value="nosniff"/>
</bean>
</list>
</property>
</bean>
<bean id="secHandlerCollection" class="org.eclipse.jetty.server.handler.HandlerCollection">
<property name="handlers">
<list>
<ref bean="rewriteHandler"/>
<bean class="org.eclipse.jetty.webapp.WebAppContext">
<property name="contextPath" value="/admin" />
<property name="resourceBase" value="${activemq.home}/webapps/admin" />
<property name="logUrlOnStart" value="true" />
</bean>
<bean class="org.eclipse.jetty.webapp.WebAppContext">
<property name="contextPath" value="/api" />
<property name="resourceBase" value="${activemq.home}/webapps/api" />
<property name="logUrlOnStart" value="true" />
</bean>
<bean class="org.eclipse.jetty.server.handler.ResourceHandler">
<property name="directoriesListed" value="false" />
<property name="welcomeFiles">
<list>
<value>index.html</value>
</list>
</property>
<property name="resourceBase" value="${activemq.home}/webapps/" />
</bean>
<bean id="defaultHandler" class="org.eclipse.jetty.server.handler.DefaultHandler">
<property name="serveIcon" value="false" />
</bean>
</list>
</property>
</bean>
<bean id="securityHandler" class="org.eclipse.jetty.security.ConstraintSecurityHandler">
<property name="loginService" ref="securityLoginService" />
<property name="authenticator">
<bean class="org.eclipse.jetty.security.authentication.BasicAuthenticator" />
</property>
<property name="constraintMappings">
<list>
<ref bean="adminSecurityConstraintMapping" />
<ref bean="securityConstraintMapping" />
</list>
</property>
<property name="handler" ref="secHandlerCollection" />
</bean>
<bean id="contexts" class="org.eclipse.jetty.server.handler.ContextHandlerCollection">
</bean>
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8161"/>
</bean>
<bean id="Server" depends-on="jettyPort" class="org.eclipse.jetty.server.Server"
destroy-method="stop">
<property name="handler">
<bean id="handlers" class="org.eclipse.jetty.server.handler.HandlerCollection">
<property name="handlers">
<list>
<ref bean="contexts" />
<ref bean="securityHandler" />
</list>
</property>
</bean>
</property>
</bean>
<bean id="invokeConnectors" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
<property name="targetObject" ref="Server" />
<property name="targetMethod" value="setConnectors" />
<property name="arguments">
<list>
<bean id="Connector" class="org.eclipse.jetty.server.ServerConnector">
<constructor-arg ref="Server" />
<!-- see the jettyPort bean -->
<property name="host" value="#{systemProperties['jetty.host']}" />
<property name="port" value="#{systemProperties['jetty.port']}" />
</bean>
<!--
Enable this connector if you wish to use https with web console
-->
<!-- bean id="SecureConnector" class="org.eclipse.jetty.server.ServerConnector">
<constructor-arg ref="Server" />
<constructor-arg>
<bean id="handlers" class="org.eclipse.jetty.util.ssl.SslContextFactory">
<property name="keyStorePath" value="${activemq.conf}/broker.ks" />
<property name="keyStorePassword" value="password" />
</bean>
</constructor-arg>
<property name="port" value="8162" />
</bean -->
</list>
</property>
</bean>
<bean id="configureJetty" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
<property name="staticMethod" value="org.apache.activemq.web.config.JspConfigurer.configureJetty" />
<property name="arguments">
<list>
<ref bean="Server" />
<ref bean="secHandlerCollection" />
</list>
</property>
</bean>
<bean id="invokeStart" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean"
depends-on="configureJetty, invokeConnectors">
<property name="targetObject" ref="Server" />
<property name="targetMethod" value="start" />
</bean>
</beans>

jetty-realm.properties:Jetty的身份验证配置文件,用于定义 web 登录用户角色和权限

jmx.access 和 jmx.password 定义 jmx 的用户名和密码。

login.config:配置登录认证机制

activemq {
org.apache.activemq.jaas.PropertiesLoginModule required
org.apache.activemq.jaas.properties.user="users.properties"
org.apache.activemq.jaas.properties.group="groups.properties";
};

users.properties 和 groups.properties:配置用户和组

配置认证和授权

使用 ActiveMQ 自带 SimpleAuthenticationPlugin

1.直接将用户名密码写入 activemq.xml 文件

<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="username" password="password" groups="users, admins" />
</users>
</simpleAuthenticationPlugin>
</plugins>

2.使用 credentials.properties 存储明文凭证

<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users, admins" />
</users>
</simpleAuthenticationPlugin>
</plugins>

activemq.xml 顶部导入了 file:${activemq.conf}/credentials.properties,我们可以使用变量的方式引用该文件内部的属性 groups="users, admins" 表示定义用户所属的用户组, activemq按照用户组分配权限,注意不能删除groups属性,可以置空。

3.使用 credentials-enc.properties 存储加密凭证

ActiveMQ 允许我们对凭证加密后存储在配置文件中,虽然更加安全但也麻烦一些,这里根据实验结果对官方文档给出的方法进行梳理

创建加密凭证:

bin/activemq encrypt --password activemq --input password

解密过程相反,但要输入相同的凭证加密密码:

bin/activemq decrypt --password activemq --input FJnN6inNmqDigYEs4wDgkwbe3l2B7mQr

将加密凭证写入 credentials-enc.properties:

activemq.username=admin
activemq.password=ENC(FJnN6inNmqDigYEs4wDgkwbe3l2B7mQr)

修改 activemq.xml 顶部的配置导入部分,注释掉原有的 credentials.properties 导入部分,新增三个bean

<!-- Allows us to use system properties as variables in this configuration file -->
<!-- <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean> -->
<bean id="environmentVariablesConfiguration" class="org.jasypt.encryption.pbe.config.EnvironmentStringPBEConfig">
<property name="algorithm" value="PBEWithMD5AndDES" />
<property name="passwordEnvName" value="ACTIVEMQ_ENCRYPTION_PASSWORD" />
</bean>
<bean id="configurationEncryptor" class="org.jasypt.encryption.pbe.StandardPBEStringEncryptor">
<property name="config" ref="environmentVariablesConfiguration" />
</bean>
<bean id="propertyConfigurer" class="org.jasypt.spring31.properties.EncryptablePropertyPlaceholderConfigurer">
<constructor-arg ref="configurationEncryptor" />
<property name="location" value="file:${activemq.base}/conf/credentials-enc.properties"/>
</bean>

三个 bean 的意思是从环境变量ACTIVEMQ_ENCRYPTION_PASSWORD中加载凭证加密密码,然后对 credentials-enc.properties 中的加密凭证解密,所以启动 mq 之前还需要设置环境变量,也可以直接将加密密码写在配置文件中。

<!-- Allows us to use system properties as variables in this configuration file -->
<!-- <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials-enc.properties</value>
</property>
</bean> -->
<bean id="configurationEncryptor" class="org.jasypt.encryption.pbe.StandardPBEStringEncryptor">
<property name="algorithm" value="PBEWithMD5AndDES"/>
<property name="password" value="activemq"/>
</bean>
<bean id="propertyConfigurer" class="org.jasypt.spring31.properties.EncryptablePropertyPlaceholderConfigurer">
<constructor-arg ref="configurationEncryptor" />
<property name="location" value="file:${activemq.base}/conf/credentials-enc.properties"/>
</bean>

启动 MQ 实例,如果采用环境变量方式存储凭证加密密码,那么这里要设置一下。

export ACTIVEMQ_ENCRYPTION_PASSWORD=activemq
bin/activemq start
unset ACTIVEMQ_ENCRYPTION_PASSWORD

使用JAASAuthentication Plugin

JAAS 全称为Java Authentication and Authorization Service JAVA认证授权服务。JAASAuthentication Plugin依赖标准的JAAS机制来实现认证,默认使用 login.config 文件作为配置。

activemq {
org.apache.activemq.jaas.PropertiesLoginModule required
org.apache.activemq.jaas.properties.user="users.properties"
org.apache.activemq.jaas.properties.group="groups.properties";
};

login.config 配置中引用 了 users.properties 和 groups.properties 分别进行用户名密码和用户组配置。

users.properties:

admin=admin

groups.properties:

admins=admin

在 activemq.xml 添加 JAASAuthentication Plugin 配置:

<plugins>
<jaasAuthenticationPlugin configuration="activemq" />
</plugins>

configuration="activemq" 指向 login.config 中配置的名称。

使用authorizationPlugin进行授权

ActiveMQ 可以对 Queue,Topic 的读写创建等进行权限控制,权限按照用户组分配。

<plugins>
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry queue="activemq.>" read="users" write="users" admin="users"/>
<authorizationEntry topic="USER.>" read="admins" write="admins" admin="admins"/>
</authorizationEntries>
</authorizationMap>
</map>
</authorizationPlugin>
</plugins>

> 表示通配符 上述配置表明只有 users 组才能读写创建 activemq. 开头的队列,只有 admins 组才能读写创建 USER. 开头的主题 值的注意的是,"activemq.>"中通配符前的点符号必须存在否则无法生效,也就是说前缀必须以点号结尾。 > 也可用 * 替换

[译]面向 Java 开发人员的 Kubernetes
Tengine动态裁剪图片