源码构建

1、从 https://github.com/apache/activemq 下载源码

1
git clone https://github.com/apache/activemq

2、切换至 activemq-5.18.x 分支

1
2
cd activemq
git checkout activemq-5.18.x

3、构建源码

1
mvn package -DskipTests

构建成功之后,在 assembly/target 目录可以看到打包好的压缩文件。

在 Idea 中运行代码 activemq-console

在 Idea 中运行代码 activemq-console,启动 activemq:

1、在 idea 中打开项目

2、解压 assembly/target 目录下生成的压缩文件,将 conf、webapps 和 lib 目录拷贝到项目的根目录下面。

3、在idea中选中子模块 activemq-console,右击 选择 Open Module Settings —> Modules —> Dependencies,点击+(加号) —>选择 JARS or directories,把 lib 中的包(包括子目录下的)全部导入

4、配置从 activemq-console 下的 Main 类启动,在 Program arguments 添加 start

5、debug 运行 Main.java 类

在 Idea 中运行单元测试类

分析 Main.java 类的 main 方法,调用链如下:

1
Main.java -> ShellCommand.java -> StartCommand.java -> BrokerService.java

从 BrokerService 类,可以看到启动 broker 的关键代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
final BrokerService broker;
try {
    // If no config uri, use default setting
    if (brokerURIs.isEmpty()) {
        configURI = new URI(DEFAULT_CONFIG_URI);
    } else {
        configURI = new URI(brokerURIs.get(0));
    }

    System.out.println("Loading message broker from: " + configURI);
    broker = BrokerFactory.createBroker(configURI);
    broker.start();

} catch (Exception e) {
    context.printException(new RuntimeException("Failed to execute start task. Reason: " + e, e));
    throw e;
}

在 idea 中全局搜索 BrokerService 类,可以看到很多单元测试类都用到了 BrokerService 类。比如 activemq-broker 模块中的 BrokerServiceTest 、BrokerInterceptorsTest 、IDERunner。

BrokerInterceptorsTest 类如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class BrokerInterceptorsTest {
    private BrokerService brokerService;

    @Before
    public void setUp() throws Exception {
        brokerService = new BrokerService();
        brokerService.setAdvisorySupport(true);
        brokerService.setPersistent(false);
        brokerService.setUseJmx(false);
        brokerService.start();
    }

    @After
    public void tearDown() throws Exception {
        if (brokerService != null) {
            brokerService.stop();
        }
    }

    @Test
    public void testNavigateInterceptors() throws Exception {
        Broker b = brokerService.getBroker();
        Assert.assertTrue(b instanceof BrokerFilter);
        
        BrokerFilter bf = (BrokerFilter) b;
        int count = 0;
        while (bf != null) {
            Broker next = bf.getNext();
            bf = next instanceof BrokerFilter ? (BrokerFilter) next : null;
            count++;
        }
        // a few Broker interceptors are created because of the config (i.e. AdvisoryBroker)
        Assert.assertTrue(count > 1);
    }
}

debug 运行 BrokerInterceptorsTest 类,可以跟踪代码,分析 BrokerService 初始化过程。

如果想查看 TransportConnector 初始化过程,可以调试 IDERunner 类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
public class IDERunner {

    private static final boolean TRANSPORT_TRACE = false;

    public static void main(String[]args) throws Exception {
        BrokerService brokerService = new BrokerService();

        brokerService.addConnector(
            "tcp://0.0.0.0:61616?trace=" + TRANSPORT_TRACE +
                "&transport.wireFormat.maxFrameSize=104857600");

        brokerService.setPersistent(false);
        brokerService.setUseJmx(false);
        brokerService.setAdvisorySupport(false);

        brokerService.start();

        brokerService.waitUntilStopped();
    }
}