TBMQ测试和源码分析

  1. 启动 TBMQ 程序,浏览器访问 http://localhost:8083,创建一个 Application,Credentials Type 为 BASIC,客户端 ID、用户名和密码均设置为 tbmq_app

  2. 使用 mosquitto 测试订阅消息

    mosquitto_sub -d -q 1 -h localhost -p 1883 -t tbmq/demo/+ -i 'tbmq_app' -u 'tbmq_app' -P 'tbmq_app' -c -v
    
  3. Debug 调试 MqttSessionHandler 类的 channelRead(ChannelHandlerContext ctx, Object msg) 方法

    1. 第一次连接,消息类型是 CONNECT

      • 消息:

        • 固定头:MqttFixedHeader[messageType=CONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=40]

        • 变量头:MqttConnectVariableHeader[name=MQTT, version=4, hasUserName=true, hasPassword=true, isWillRetain=false, isWillFlag=false, isCleanSession=false, keepAliveTimeSeconds=60]

        • 负载:MqttConnectPayload[clientIdentifier=tbmq_app, willTopic=null, willMessage=null, userName=tbmq_app, password=[116, 98, 109, 113, 95, 97, 112, 112]]

      • address 为空,故从 ChannelHandlerContext 获取客户端地址,并将地址保持到客户端 Session 上下文 ClientSessionCtx

      • 初始化 Session:获取 clientI、mqtt 版本,调用 ClientMqttActorManager 初始化 session:创建 clientActorRef,发送一个 SessionInitMsg 消息

      • 调用 ClientMqttActorManager connect 方法,发送一个 MqttConnectMsg 消息

    2. 第二次连接,消息类型是 SUBSCRIBE

      MqttSubscribeMessage[fixedHeader=MqttFixedHeader[messageType=SUBSCRIBE, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=16], variableHeader=MqttMessageIdAndPropertiesVariableHeader[messageId=1, properties=io.netty.handler.codec.mqtt.MqttProperties@15c52601], payload=MqttSubscribePayload[MqttTopicSubscription[topicFilter=tbmq/demo/+, option=SubscriptionOption[qos=AT_LEAST_ONCE, noLocal=false, retainAsPublished=false, retainHandling=SEND_AT_SUBSCRIBE]]]]

    3. 第三次连接,消息类型是 PINGREQ

MqttSessionHandler

第一次连接时,ClientMqttActorManagerImpl 初始化代码

@Override
public void initSession(String clientId, boolean isClientIdGenerated, SessionInitMsg sessionInitMsg) {
    TbActorRef clientActorRef = getActor(clientId);
    if (clientActorRef == null) {
        clientActorRef = createRootActor(clientId, isClientIdGenerated);
    }
    clientActorRef.tellWithHighPriority(sessionInitMsg);
}

可以看到:

  • MqttSessionHandler 将消息的处理委托给了 ClientMqttActorManager 对象。ClientMqttActorManager使用了 Actor 模型。

    • ClientMqttActorManager

      • ActorSystemContext

      • TbActorSystem

        • 通过 ClientActorCreator 创建一个 TbActorCtx,其实是创建了一个 TbActorMailbox

          private TbActorRef createRootActor(String clientId, boolean isClientIdGenerated) {
            return actorSystem.createRootActor(ActorSystemLifecycle.CLIENT_DISPATCHER_NAME,
                    new ClientActorCreator(actorSystemContext, clientId, isClientIdGenerated));
          }
          
        • ClientActorCreator 负责创建 ClientActor

  • NettyMqttConverter 负责将 MqttMessage 转换为 TbActorMsg。

  • clientActorRef.tellWithHighPriority 发送了一个高优先级的消息:clientActorRef.tellWithHighPriority(sessionInitMsg)。这个方法是 Actor 模型定义的方法,调用该方法意味着程序的执行进入了 Actor。关于 Actor 具体的执行逻辑,可以先忽略,主要关注 Actor 是在什么时候、怎么调用业务代码的(即如何处理 MQTT 消息的逻辑)。

    • 调用 TbActorMailbox的 enqueue(TbActorMsg msg, boolean highPriority) 方法
      • 调用 tryProcessQueue(true);
        • 调用 processMailbox() 方法
          • 调用 actor.process(msg); 方法,这里的 actor 是由 ClientActorCreator 创建的 ClientActor
            • 调用 ClientActor 的actor.process(msg); 方法
              • 如果 TimedMsg 则 clientActorStats.logMsgQueueTime(msg, TimeUnit.NANOSECONDS);
              • clientLogger 记录事件
              • 如果是队列消息 QueueableMqttMsg,调用 processQueueableMqttMsg((QueueableMqttMsg) msg); 方法
              • 如果是其他消息:
              • SessionInitMsg:ActorProcessor
                • 创建 AuthContext
                • 调用 AuthResponse authenticateClient(AuthContext authContext)
                  • 调用 authenticationService.authenticate(authContext)
                    • 调用 MqttClientAuthProvider的authenticate(AuthContext authContext)
                      • 调用 BasicMqttClientAuthProvider
                        • 认证密码 BasicMqttCredentials
                        • 调用 authorizationRuleService.parseBasicAuthorizationRule(credentials) 获取 AuthRulePatterns
                        • 返回 AuthResponse
                • finishSessionAuth(state.getClientId(), sessionCtx, authResponse) : ClientSessionCtx 保存客户端类型、authRulePatterns
                • 更新 ClientActorState
              • MQTT_CONNECT_MSG:
                • ConnectService startConnection
                  • ClientSessionCtx 保存 SessionInfo
                  • ClientSessionCtx 设置 TopicAliasCtx
                  • KeepAliveService 注册 session
                  • ClientSessionEventService requestConnection(SessionInfo sessionInfo)
                    • 发送 ClientSessionEventProto 到 MQ

MqttSessionHandler 类调用关系:

  • MqttSessionHandler

    • ClientMqttActorManager

      • TbActorMailbox

        • ContextAwareActor

          • ClientActor

            • 1:SessionInitMsg

              • ActorProcessor
                • AuthenticationService
                  • MqttClientAuthProvider
                    • MqttClientCredentialsService
                      • MqttClientCredentialsDao
              1. MqttConnectMsg
              • ConnectService
                • ClientSessionEventService
                  • TbQueueProducer
                    • ClientSessionEventConsumer
                      • ClientSessionEventActorManager
                        • TbActorRef
              1. ConnectionRequestMsg
              • SessionClusterManager
                • ClientSessionService
                  • ClientSessionPersistenceService
                    • TbQueueProducer
            • ConnectionAcceptedMsg

              • ConnectService
            • MqttDisconnectMsg

              • DisconnectService
            • SessionDisconnectedMsg

              • SessionClusterManager
                • ClientSessionService
                • ClientSubscriptionService

MqttSessionHandler 是一个用于处理MQTT会话的Java类。它实现了ChannelInboundHandlerAdapterGenericFutureListener接口,因此可以作为Netty框架中的处理组件。

实现原理:

  1. 类中包含了一个sessionId,用于标识每个会话。sessionId是随机生成的UUID,每次会话开始时都会重新生成一个新的sessionId。
  2. 类中包含了各种处理MQTT消息的方法,例如processMqttMsg方法。这个方法会根据接收到的MQTT消息类型进行相应的处理。例如,当收到CONNECT消息时,会调用initSession方法初始化会话。当收到PUBLISH消息时,会检查速率限制并处理消息。
  3. disconnect方法用于断开与客户端的连接。当收到断开连接的消息时,会调用这个方法。这个方法会关闭客户端的通道,并发送一个MQTT断开连接消息给客户端。
  4. getAddress方法用于获取客户端的连接地址。这个方法会从上下文中获取客户端的地址,如果获取失败,会使用远程地址作为备选方案。
  5. operationComplete方法实现了GenericFutureListener接口的方法。当一个操作完成时,会调用这个方法。在这个方法中,会检查clientId是否为null,如果是,则断开连接,发送一个MQTT断开连接消息给客户端。
  6. exceptionCaught方法用于处理异常。当发生异常时,会调用这个方法。这个方法会记录异常信息,并断开连接。
  7. channelReadComplete方法用于刷新缓冲区。当一条消息读取完成时,会调用这个方法。
  8. channelRead方法是主要处理方法。当接收到数据时,会调用这个方法进行处理。这个方法会先检查address是否为null,如果是,则生成一个新的address。然后,会根据接收到的MQTT消息类型进行相应的处理。

channelRead方法用于处理来自网络通道的消息。具体来说,它处理的是MQTT协议的消息。MQTT是一种轻量级的通信协议,用于简化网络应用程序之间的通信。这段代码的主要作用是解析和处理来自客户端的MQTT消息。

实现原理:

  1. 首先,代码检查address是否为null,如果是,则调用getAddress(ctx)获取客户端的地址,并将其设置到address变量中。同时,将address设置到clientSessionCtx中。这样可以方便地在日志中记录客户端的地址。
  2. 如果log.isTraceEnabled()为true,则记录下当前处理的消息,以便于调试和日志分析。
  3. 将ctx设置到clientSessionCtx中,这样后续的处理都可以通过clientSessionCtx来进行。
  4. 检查接收到的消息msg是否是MqttMessage类型。如果不是,则记录一条警告信息,并断开与客户端的连接。
  5. 如果消息msg的解码结果不成功,则记录一条警告信息,并断开与客户端的连接。如果消息msg的解码结果是TooLongFrameException异常,则断开连接的原因是ON_PACKET_TOO_LARGE,否则断开连接的原因是ON_MALFORMED_PACKET。
  6. 调用processMqttMsg方法处理MqttMessage类型的消息。
  7. 在处理消息后, finally 块中,通过ReferenceCountUtil.safeRelease(msg)释放消息msg的引用计数。

processMqttMsg 方法接收一个MqttMessage对象作为参数,然后根据消息的类型进行相应的处理。

实现原理:

  1. 首先,它检查消息的固定头部是否为空,如果为空,则抛出一个协议违反异常。
  2. 接下来,它检查客户端ID(clientId)是否为空。如果是连接消息(CONNECT),则初始化会话(initSession方法);否则,如果clientId为空,则抛出一个协议违反异常。
  3. 然后,它记录客户端日志(clientLogger)并使用一个switch语句根据消息类型进行相应的处理。

用途:

这个方法主要用于处理MQTT消息,根据消息类型进行相应的处理。

例如,当收到CONNECT消息时,它需要初始化会话;收到SUBSCRIBE消息时,需要订阅topic;收到PUBLISH消息时,需要发布消息等。

ClientMqttActorManagerImpl

这个代码是一个Java实现的MQTT客户端 ActorSystem,用于处理MQTT消息。以下是这个代码的主要实现原理、用途和注意事项:

  1. 实现原理:这个类使用了Akka框架的ActorSystem来处理MQTT消息。ActorSystem是一个用于组织和管理Akkaactor的层次结构。Actor是Akka的核心概念,它允许代码以消息传递的方式编写,从而实现并行和无阻塞的异步编程。在这个类中,ClientMqttActorManagerImpl作为管理器,负责创建、配置和控制Actor的创建和消息处理。
  2. 用途:这个类的主要用途是处理MQTT消息,特别是连接、断连、订阅和取消订阅等消息。通过这个类,客户端可以与ActorSystem通信,从而处理MQTT消息。

Akka 是一个用于开发并发应用程序的Java框架,它基于Akka Actor模型。ActorSystem是一个用于组织和管理Akkaactor的层次结构。Actor是Akka的核心概念,它允许代码以消息传递的方式编写,从而实现并行和无阻塞的异步编程。ActorSystem的主要作用是创建、配置和控制Actor的创建和消息处理。

在这个系统中,Actor是基本的通信单元。它们通过消息传递进行通信,而不是直接调用其他方法。这种设计允许代码以一种灵活的方式编写,从而适应并发编程的要求。ActorSystem的主要组件包括:

  1. Celluloid:这是一个用于协调并发行为的库,它提供了基本的线程安全和通信机制。
  2. JSR 229:这是一个Java标准规范,它定义了一个用于处理消息的API。Akka使用JSR 229 API来实现Actor之间的通信。
  3. ThreadPool:这是一个线程池,它用于在Actor系统中共享线程资源。这样可以提高性能,避免频繁创建和销毁线程。
  4. Mailbox:这是一个消息存储结构,用于存储Actor收到的消息。Mailbox可以确保消息的安全性和顺序性。
  5. ActorRef:这是一个引用,用于表示Actor的实例。ActorRef允许其他Actor通过其访问和操作Actor。
  6. Manager:这是一个高级的Actor管理器,它负责创建、配置和控制Actor的创建和消息处理。
  7. ActorSystem:这是一个全局的Actor系统,它负责管理所有的Actor和消息。

总之,Akka框架的主要特性是并行编程,它通过ActorSystem来实现。ActorSystem是一个层次结构,由Actor组成,它们通过消息传递进行通信。这种设计使得代码易于理解和维护,同时满足并发编程的要求。

TbActorMailbox

这个代码是一个实现高优先级消息处理的Java类。它主要用于在消息队列中处理高优先级消息,从而提高性能。以下是这个类的详细解释:

  1. 定义了两个常量HIGH_PRIORITYNORMAL_PRIORITY来表示高优先级和普通优先级。
  2. 定义了两个常量FREEBUSY来表示邮箱的空闲状态和处理消息的状态。
  3. 定义了两个常量NOT_READYREADY来表示邮箱未准备好接收消息和已准备好接收消息的状态。
  4. 使用了ConcurrentLinkedQueue来存储消息,以提高性能。
  5. initActor方法用于初始化 Actor,它会将初始化任务分配给线程池执行。
  6. enqueue方法用于将消息添加到邮箱中,它会根据消息的优先级将消息添加到相应的队列中。
  7. tryProcessQueue方法用于处理消息,它会检查邮箱是否已准备好接收消息,是否在destroyInProgress中,以及是否还有消息需要处理。如果满足条件,它会调用processMailbox方法来处理消息。
  8. processMailbox方法用于处理消息,它会从队列中取出消息,并调用actor.process方法来处理消息。
  9. getSelf方法用于获取当前的ActorId。
  10. tell方法用于发送消息给子Actor,它会将消息发送给系统。
  11. broadcastToChildren方法用于向子Actor广播消息,它会将消息广播给系统中满足条件的子Actor。
  12. filterChildren方法用于过滤子Actor,它会返回满足条件的子Actor列表。
  13. stop方法用于停止Actor,它会将停止消息发送给系统。
  14. getOrCreateChildActor方法用于创建或获取子Actor,它会根据条件来创建或获取子Actor。
  15. destroy方法用于销毁Actor,它会设置destroyInProgress为true,然后销毁Actor。
  16. 实现了TbActorId接口,该接口定义了Actor的唯一标识。
  17. telltellWithHighPriority方法用于发送消息给子Actor,它会根据消息的优先级将消息添加到相应的队列中。

通过实现这个类,可以在Actor系统中处理高优先级消息,从而提高性能。

MQ消息主题:

  • tbmq.client.session
    • ClientSessionConsumer
  • tbmq.client.subscriptions
    • ClientSubscriptionConsumer
  • tbmq.client.session.event.request
    • ClientSessionEventConsumer
  • tbmq.msg.persisted
  • tbmq.msg.retained
    • RetainedMsgConsumer
  • tbmq.client.disconnect.chensoulMacBook.local
    • DisconnectClientCommandConsumer
  • tbmq.client.session.event.response.chensoulMacBook.local
  • tbmq.msg.downlink.basic.chensoulMacBook.local
  • tbmq.msg.app.1b2512eca293401e8be2f7376becfb6273758da77822acc28f9debd557f1d97a
  • tbmq.sys.historical.data
    • HistoricalStatsTotalConsumer
Share this post:

Related content