TBMQ测试和源码分析
启动 TBMQ 程序,浏览器访问 http://localhost:8083,创建一个 Application,Credentials Type 为 BASIC,客户端 ID、用户名和密码均设置为 tbmq_app
使用 mosquitto 测试订阅消息
mosquitto_sub -d -q 1 -h localhost -p 1883 -t tbmq/demo/+ -i 'tbmq_app' -u 'tbmq_app' -P 'tbmq_app' -c -v
Debug 调试 MqttSessionHandler 类的
channelRead(ChannelHandlerContext ctx, Object msg)
方法第一次连接,消息类型是 CONNECT
消息:
固定头:MqttFixedHeader[messageType=CONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=40]
变量头:MqttConnectVariableHeader[name=MQTT, version=4, hasUserName=true, hasPassword=true, isWillRetain=false, isWillFlag=false, isCleanSession=false, keepAliveTimeSeconds=60]
负载:MqttConnectPayload[clientIdentifier=tbmq_app, willTopic=null, willMessage=null, userName=tbmq_app, password=[116, 98, 109, 113, 95, 97, 112, 112]]
address 为空,故从 ChannelHandlerContext 获取客户端地址,并将地址保持到客户端 Session 上下文 ClientSessionCtx
初始化 Session:获取 clientI、mqtt 版本,调用 ClientMqttActorManager 初始化 session:创建 clientActorRef,发送一个 SessionInitMsg 消息
调用 ClientMqttActorManager connect 方法,发送一个 MqttConnectMsg 消息
第二次连接,消息类型是 SUBSCRIBE
MqttSubscribeMessage[fixedHeader=MqttFixedHeader[messageType=SUBSCRIBE, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=16], variableHeader=MqttMessageIdAndPropertiesVariableHeader[messageId=1, properties=io.netty.handler.codec.mqtt.MqttProperties@15c52601], payload=MqttSubscribePayload[MqttTopicSubscription[topicFilter=tbmq/demo/+, option=SubscriptionOption[qos=AT_LEAST_ONCE, noLocal=false, retainAsPublished=false, retainHandling=SEND_AT_SUBSCRIBE]]]]
第三次连接,消息类型是 PINGREQ
MqttSessionHandler
第一次连接时,ClientMqttActorManagerImpl 初始化代码
@Override
public void initSession(String clientId, boolean isClientIdGenerated, SessionInitMsg sessionInitMsg) {
TbActorRef clientActorRef = getActor(clientId);
if (clientActorRef == null) {
clientActorRef = createRootActor(clientId, isClientIdGenerated);
}
clientActorRef.tellWithHighPriority(sessionInitMsg);
}
可以看到:
MqttSessionHandler 将消息的处理委托给了 ClientMqttActorManager 对象。ClientMqttActorManager使用了 Actor 模型。
ClientMqttActorManager
ActorSystemContext
TbActorSystem
通过 ClientActorCreator 创建一个 TbActorCtx,其实是创建了一个 TbActorMailbox
private TbActorRef createRootActor(String clientId, boolean isClientIdGenerated) { return actorSystem.createRootActor(ActorSystemLifecycle.CLIENT_DISPATCHER_NAME, new ClientActorCreator(actorSystemContext, clientId, isClientIdGenerated)); }
ClientActorCreator 负责创建 ClientActor
NettyMqttConverter 负责将 MqttMessage 转换为 TbActorMsg。
clientActorRef.tellWithHighPriority 发送了一个高优先级的消息:
clientActorRef.tellWithHighPriority(sessionInitMsg)
。这个方法是 Actor 模型定义的方法,调用该方法意味着程序的执行进入了 Actor。关于 Actor 具体的执行逻辑,可以先忽略,主要关注 Actor 是在什么时候、怎么调用业务代码的(即如何处理 MQTT 消息的逻辑)。- 调用 TbActorMailbox的 enqueue(TbActorMsg msg, boolean highPriority) 方法
- 调用 tryProcessQueue(true);
- 调用 processMailbox() 方法
- 调用 actor.process(msg); 方法,这里的 actor 是由 ClientActorCreator 创建的 ClientActor
- 调用 ClientActor 的actor.process(msg); 方法
- 如果 TimedMsg 则 clientActorStats.logMsgQueueTime(msg, TimeUnit.NANOSECONDS);
- clientLogger 记录事件
- 如果是队列消息 QueueableMqttMsg,调用 processQueueableMqttMsg((QueueableMqttMsg) msg); 方法
- 如果是其他消息:
- SessionInitMsg:ActorProcessor
- 创建 AuthContext
- 调用 AuthResponse authenticateClient(AuthContext authContext)
- 调用 authenticationService.authenticate(authContext)
- 调用 MqttClientAuthProvider的authenticate(AuthContext authContext)
- 调用 BasicMqttClientAuthProvider
- 认证密码 BasicMqttCredentials
- 调用 authorizationRuleService.parseBasicAuthorizationRule(credentials) 获取 AuthRulePatterns
- 返回 AuthResponse
- 调用 BasicMqttClientAuthProvider
- 调用 MqttClientAuthProvider的authenticate(AuthContext authContext)
- 调用 authenticationService.authenticate(authContext)
- finishSessionAuth(state.getClientId(), sessionCtx, authResponse) : ClientSessionCtx 保存客户端类型、authRulePatterns
- 更新 ClientActorState
- MQTT_CONNECT_MSG:
- ConnectService startConnection
- ClientSessionCtx 保存 SessionInfo
- ClientSessionCtx 设置 TopicAliasCtx
- KeepAliveService 注册 session
- ClientSessionEventService requestConnection(SessionInfo sessionInfo)
- 发送 ClientSessionEventProto 到 MQ
- ConnectService startConnection
- 调用 ClientActor 的actor.process(msg); 方法
- 调用 actor.process(msg); 方法,这里的 actor 是由 ClientActorCreator 创建的 ClientActor
- 调用 processMailbox() 方法
- 调用 tryProcessQueue(true);
- 调用 TbActorMailbox的 enqueue(TbActorMsg msg, boolean highPriority) 方法
MqttSessionHandler 类调用关系:
MqttSessionHandler
ClientMqttActorManager
TbActorMailbox
ContextAwareActor
ClientActor
1:SessionInitMsg
- ActorProcessor
- AuthenticationService
- MqttClientAuthProvider
- MqttClientCredentialsService
- MqttClientCredentialsDao
- MqttClientCredentialsService
- MqttClientAuthProvider
- AuthenticationService
- ActorProcessor
- MqttConnectMsg
- ConnectService
- ClientSessionEventService
- TbQueueProducer
- ClientSessionEventConsumer
- ClientSessionEventActorManager
- TbActorRef
- ClientSessionEventActorManager
- ClientSessionEventConsumer
- TbQueueProducer
- ClientSessionEventService
- ConnectionRequestMsg
- SessionClusterManager
- ClientSessionService
- ClientSessionPersistenceService
- TbQueueProducer
- ClientSessionPersistenceService
- ClientSessionService
ConnectionAcceptedMsg
- ConnectService
MqttDisconnectMsg
- DisconnectService
SessionDisconnectedMsg
- SessionClusterManager
- ClientSessionService
- ClientSubscriptionService
- SessionClusterManager
MqttSessionHandler 是一个用于处理MQTT会话的Java类。它实现了ChannelInboundHandlerAdapter
和GenericFutureListener
接口,因此可以作为Netty框架中的处理组件。
实现原理:
- 类中包含了一个
sessionId
,用于标识每个会话。sessionId是随机生成的UUID,每次会话开始时都会重新生成一个新的sessionId。 - 类中包含了各种处理MQTT消息的方法,例如
processMqttMsg
方法。这个方法会根据接收到的MQTT消息类型进行相应的处理。例如,当收到CONNECT
消息时,会调用initSession
方法初始化会话。当收到PUBLISH
消息时,会检查速率限制并处理消息。 disconnect
方法用于断开与客户端的连接。当收到断开连接的消息时,会调用这个方法。这个方法会关闭客户端的通道,并发送一个MQTT断开连接消息给客户端。getAddress
方法用于获取客户端的连接地址。这个方法会从上下文中获取客户端的地址,如果获取失败,会使用远程地址作为备选方案。operationComplete
方法实现了GenericFutureListener
接口的方法。当一个操作完成时,会调用这个方法。在这个方法中,会检查clientId
是否为null
,如果是,则断开连接,发送一个MQTT断开连接消息给客户端。exceptionCaught
方法用于处理异常。当发生异常时,会调用这个方法。这个方法会记录异常信息,并断开连接。channelReadComplete
方法用于刷新缓冲区。当一条消息读取完成时,会调用这个方法。channelRead
方法是主要处理方法。当接收到数据时,会调用这个方法进行处理。这个方法会先检查address
是否为null
,如果是,则生成一个新的address
。然后,会根据接收到的MQTT消息类型进行相应的处理。
channelRead
方法用于处理来自网络通道的消息。具体来说,它处理的是MQTT协议的消息。MQTT是一种轻量级的通信协议,用于简化网络应用程序之间的通信。这段代码的主要作用是解析和处理来自客户端的MQTT消息。
实现原理:
- 首先,代码检查address是否为null,如果是,则调用getAddress(ctx)获取客户端的地址,并将其设置到address变量中。同时,将address设置到clientSessionCtx中。这样可以方便地在日志中记录客户端的地址。
- 如果log.isTraceEnabled()为true,则记录下当前处理的消息,以便于调试和日志分析。
- 将ctx设置到clientSessionCtx中,这样后续的处理都可以通过clientSessionCtx来进行。
- 检查接收到的消息msg是否是MqttMessage类型。如果不是,则记录一条警告信息,并断开与客户端的连接。
- 如果消息msg的解码结果不成功,则记录一条警告信息,并断开与客户端的连接。如果消息msg的解码结果是TooLongFrameException异常,则断开连接的原因是ON_PACKET_TOO_LARGE,否则断开连接的原因是ON_MALFORMED_PACKET。
- 调用processMqttMsg方法处理MqttMessage类型的消息。
- 在处理消息后, finally 块中,通过ReferenceCountUtil.safeRelease(msg)释放消息msg的引用计数。
processMqttMsg
方法接收一个MqttMessage对象作为参数,然后根据消息的类型进行相应的处理。
实现原理:
- 首先,它检查消息的固定头部是否为空,如果为空,则抛出一个协议违反异常。
- 接下来,它检查客户端ID(clientId)是否为空。如果是连接消息(CONNECT),则初始化会话(initSession方法);否则,如果clientId为空,则抛出一个协议违反异常。
- 然后,它记录客户端日志(clientLogger)并使用一个switch语句根据消息类型进行相应的处理。
用途:
这个方法主要用于处理MQTT消息,根据消息类型进行相应的处理。
例如,当收到CONNECT消息时,它需要初始化会话;收到SUBSCRIBE消息时,需要订阅topic;收到PUBLISH消息时,需要发布消息等。
ClientMqttActorManagerImpl
这个代码是一个Java实现的MQTT客户端 ActorSystem,用于处理MQTT消息。以下是这个代码的主要实现原理、用途和注意事项:
- 实现原理:这个类使用了Akka框架的ActorSystem来处理MQTT消息。ActorSystem是一个用于组织和管理Akkaactor的层次结构。Actor是Akka的核心概念,它允许代码以消息传递的方式编写,从而实现并行和无阻塞的异步编程。在这个类中,ClientMqttActorManagerImpl作为管理器,负责创建、配置和控制Actor的创建和消息处理。
- 用途:这个类的主要用途是处理MQTT消息,特别是连接、断连、订阅和取消订阅等消息。通过这个类,客户端可以与ActorSystem通信,从而处理MQTT消息。
Akka 是一个用于开发并发应用程序的Java框架,它基于Akka Actor模型。ActorSystem是一个用于组织和管理Akkaactor的层次结构。Actor是Akka的核心概念,它允许代码以消息传递的方式编写,从而实现并行和无阻塞的异步编程。ActorSystem的主要作用是创建、配置和控制Actor的创建和消息处理。
在这个系统中,Actor是基本的通信单元。它们通过消息传递进行通信,而不是直接调用其他方法。这种设计允许代码以一种灵活的方式编写,从而适应并发编程的要求。ActorSystem的主要组件包括:
- Celluloid:这是一个用于协调并发行为的库,它提供了基本的线程安全和通信机制。
- JSR 229:这是一个Java标准规范,它定义了一个用于处理消息的API。Akka使用JSR 229 API来实现Actor之间的通信。
- ThreadPool:这是一个线程池,它用于在Actor系统中共享线程资源。这样可以提高性能,避免频繁创建和销毁线程。
- Mailbox:这是一个消息存储结构,用于存储Actor收到的消息。Mailbox可以确保消息的安全性和顺序性。
- ActorRef:这是一个引用,用于表示Actor的实例。ActorRef允许其他Actor通过其访问和操作Actor。
- Manager:这是一个高级的Actor管理器,它负责创建、配置和控制Actor的创建和消息处理。
- ActorSystem:这是一个全局的Actor系统,它负责管理所有的Actor和消息。
总之,Akka框架的主要特性是并行编程,它通过ActorSystem来实现。ActorSystem是一个层次结构,由Actor组成,它们通过消息传递进行通信。这种设计使得代码易于理解和维护,同时满足并发编程的要求。
TbActorMailbox
这个代码是一个实现高优先级消息处理的Java类。它主要用于在消息队列中处理高优先级消息,从而提高性能。以下是这个类的详细解释:
- 定义了两个常量
HIGH_PRIORITY
和NORMAL_PRIORITY
来表示高优先级和普通优先级。 - 定义了两个常量
FREE
和BUSY
来表示邮箱的空闲状态和处理消息的状态。 - 定义了两个常量
NOT_READY
和READY
来表示邮箱未准备好接收消息和已准备好接收消息的状态。 - 使用了
ConcurrentLinkedQueue
来存储消息,以提高性能。 initActor
方法用于初始化 Actor,它会将初始化任务分配给线程池执行。enqueue
方法用于将消息添加到邮箱中,它会根据消息的优先级将消息添加到相应的队列中。tryProcessQueue
方法用于处理消息,它会检查邮箱是否已准备好接收消息,是否在destroyInProgress中,以及是否还有消息需要处理。如果满足条件,它会调用processMailbox
方法来处理消息。processMailbox
方法用于处理消息,它会从队列中取出消息,并调用actor.process
方法来处理消息。getSelf
方法用于获取当前的ActorId。tell
方法用于发送消息给子Actor,它会将消息发送给系统。broadcastToChildren
方法用于向子Actor广播消息,它会将消息广播给系统中满足条件的子Actor。filterChildren
方法用于过滤子Actor,它会返回满足条件的子Actor列表。stop
方法用于停止Actor,它会将停止消息发送给系统。getOrCreateChildActor
方法用于创建或获取子Actor,它会根据条件来创建或获取子Actor。destroy
方法用于销毁Actor,它会设置destroyInProgress
为true,然后销毁Actor。- 实现了
TbActorId
接口,该接口定义了Actor的唯一标识。 tell
和tellWithHighPriority
方法用于发送消息给子Actor,它会根据消息的优先级将消息添加到相应的队列中。
通过实现这个类,可以在Actor系统中处理高优先级消息,从而提高性能。
MQ消息主题:
- tbmq.client.session
- ClientSessionConsumer
- tbmq.client.subscriptions
- ClientSubscriptionConsumer
- tbmq.client.session.event.request
- ClientSessionEventConsumer
- tbmq.msg.persisted
- tbmq.msg.retained
- RetainedMsgConsumer
- tbmq.client.disconnect.chensoulMacBook.local
- DisconnectClientCommandConsumer
- tbmq.client.session.event.response.chensoulMacBook.local
- tbmq.msg.downlink.basic.chensoulMacBook.local
- tbmq.msg.app.1b2512eca293401e8be2f7376becfb6273758da77822acc28f9debd557f1d97a
- tbmq.sys.historical.data
- HistoricalStatsTotalConsumer