Teach you how to write a Mqtt gateway

Abstract: The Internet of Things is a relatively popular software field now. Many Internet of Things manufacturers have their own Internet of Things platforms, and one of the core modules of the Internet of Things platform is the Mqtt gateway.

This article is shared from Huawei Cloud Community " This article takes you to master the technical principles behind the construction of the mqtt gateway of the Internet of Things ", Author: Zhang Jian.

foreword

The Internet of Things is a relatively popular software field now. Many Internet of Things manufacturers have their own Internet of Things platforms, and one of the core modules of the Internet of Things platforms is the Mqtt gateway. The purpose of this article is to teach you how to write an mqtt gateway. The back-end storage supports Kafka/Pulsar, and supports mqtt connection, disconnection, sending messages, and subscribing messages. Technology selection:

  • Netty java's most popular network framework
  • netty-codec-mqtt subproject of netty, mqtt codec plugin
  • Pulsar/Kafka popular message middleware as backend storage

The core pom depends on the following

<dependency>
 <groupId>io.netty</groupId>
 <artifactId>netty-codec-mqtt</artifactId>
 </dependency>
 <dependency>
 <groupId>io.netty</groupId>
 <artifactId>netty-common</artifactId>
 </dependency>
 <dependency>
 <groupId>io.netty</groupId>
 <artifactId>netty-transport</artifactId>
 </dependency>
 <dependency>
 <groupId>org.apache.pulsar</groupId>
 <artifactId>pulsar-client-original</artifactId>
 <version>${pulsar.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>${kafka.version}</version>
 </dependency>
 <dependency>
 <groupId>org.eclipse.paho</groupId>
 <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
 <version>${mqtt-client.version}</version>
 <scope>test</scope>
 </dependency>

Software parameter design

Software parameters are very common, complex open source projects, parameters can even reach hundreds, and configuration files can be as long as thousands of lines. The configuration we need is

The port that MqttServer listens on

The configuration of the listening port is very necessary even for writing demo s. It is often used in unit tests. After the unit test is finished, even if the network server is closed, the operating system will not release the port immediately, so it is very important to specify a random port when unit testing , in java, we can obtain a free port through such a tool class. If it is not configured, we will use the default port 1883 of mqtt.

package io.github.protocol.mqtt.broker.util;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.ServerSocket;

public class SocketUtil {

 public static int getFreePort() {
 try (ServerSocket serverSocket = new ServerSocket(0)) {
 return serverSocket.getLocalPort();
        } catch (IOException e) {
 throw new UncheckedIOException(e);
        }
    }

}

Backend storage configuration

Our mqtt gateway does not have reliable storage capabilities, and relies on the back-end message middleware for persistence processing. The back-end planning supports two types of Pulsar and Kafka. Define the enumeration class as follows

public enum ProcessorType {
    KAFKA,
    PULSAR,
}

The corresponding KafkaProcessorConfig and PulsarProcessorConfig are relatively simple, including the basic connection address. If performance tuning and security are to be done later, there will still be more configuration items in this block

@Setter
@Getter
public class KafkaProcessorConfig {

 private String bootstrapServers = "localhost:9092";

 public KafkaProcessorConfig() {
    }
}
@Setter
@Getter
public class PulsarProcessorConfig {

 private String httpUrl = "http://localhost:8080";

 private String serviceUrl = "pulsar://localhost:6650";

 public PulsarProcessorConfig() {
    }
}

Start netty-MqttServer

We start a mqttServer through netty, add mqtt decoder

package io.github.protocol.mqtt.broker;

import io.github.protocol.mqtt.broker.processor.KafkaProcessor;
import io.github.protocol.mqtt.broker.processor.KafkaProcessorConfig;
import io.github.protocol.mqtt.broker.processor.MqttProcessor;
import io.github.protocol.mqtt.broker.processor.PulsarProcessor;
import io.github.protocol.mqtt.broker.processor.PulsarProcessorConfig;
import io.github.protocol.mqtt.broker.util.SocketUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MqttServer {

 private final MqttServerConfig mqttServerConfig;

 public MqttServer() {
 this(new MqttServerConfig());
    }

 public MqttServer(MqttServerConfig mqttServerConfig) {
 this.mqttServerConfig = mqttServerConfig;
 if (mqttServerConfig.getPort() == 0) {
            mqttServerConfig.setPort(SocketUtil.getFreePort());
        }
    }

 public void start() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
 try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
 public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
 // decoder
                            p.addLast(new MqttDecoder());
                            p.addLast(MqttEncoder.INSTANCE);
                        }
                    });

 // Start the server.
            ChannelFuture f = b.bind(mqttServerConfig.getPort()).sync();

 // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
 // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

 private MqttProcessor processor(MqttServerConfig config) {
 return switch (config.getProcessorType()) {
 case KAFKA -> new KafkaProcessor(config.getMqttAuth(), config.getKafkaProcessorConfig());
 case PULSAR -> new PulsarProcessor(config.getMqttAuth(), config.getPulsarProcessorConfig());
        };
    }

 public int getPort() {
 return mqttServerConfig.getPort();
    }

}

MqttserverStarter.java

We write a simple main function to start mqttServer for easy debugging

package io.github.protocol.mqtt.broker;

public class MqttServerStarter {

 public static void main(String[] args) throws Exception {
 new MqttServer().start();
    }

}

The client uses eclipse mqtt client for testing

package io.github.protocol.mqtt;

import lombok.extern.log4j.Log4j2;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

@Log4j2
public class MqttClientPublishExample {

 public static void main(String[] args) throws Exception {
        String topic = "MQTT Examples";
        String content = "Message from MqttPublishExample";
        int qos = 2;
        String broker = "tcp://127.0.0.1:1883";
        String clientId = "JavaSample";
        MemoryPersistence persistence = new MemoryPersistence();

 try {
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            log.info("Connecting to broker: {}", broker);
            sampleClient.connect(connOpts);
            log.info("Connected");
            log.info("Publishing message: {}", content);
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            sampleClient.publish(topic, message);
            log.info("Message published");
            sampleClient.disconnect();
            log.info("Disconnected");
            System.exit(0);
        } catch (MqttException me) {
            log.error("reason {} msg {}", me.getReasonCode(), me.getMessage(), me);
        }
    }

}

Then we run MqttServer first, then run MqttClient and find that MqttClient is stuck

Connecting to broker: tcp://127.0.0.1:1883

Why is this? By capturing packets, we found that only the client sent the Mqtt connect message, but the server did not respond.

However, according to the mqtt standard protocol, when sending a Connect message, there must be a ConnAck response

So we need to return the connAck message after receiving Connect. We create a MqttHandler and let it inherit ChannelInboundHandlerAdapter to relay the message after decoding by MqttDecoder. Here we should focus on inheriting the channelRead method and channelInactive method to release the resources that need to be released when the link is broken.

package com.github.shoothzj.mqtt;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MqttHandler extends ChannelInboundHandlerAdapter {

    @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 super.channelRead(ctx, msg);
    }

}

Then add this handler to netty's responsibility chain and put it behind the decoder

Insert our code in mqtt handler

@Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 super.channelRead(ctx, msg);
 if (msg instanceof MqttConnectMessage) {
            handleConnect(ctx, (MqttConnectMessage) msg);
        } else {
            log.error("Unsupported type msg [{}]", msg);
        }
    }

 private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {
        log.info("connect msg is [{}]", connectMessage);
    }

Print out connectMessage as follows

[MqttConnectMessage[fixedHeader=MqttFixedHeader[messageType=CONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=22], variableHeader=MqttConnectVariableHeader[name=MQTT, version=4, hasUserName=false, hasPassword=false, isWillRetain=false, isWillFlag=false, isCleanSession=true, keepAliveTimeSeconds=60], payload=MqttConnectPayload[clientIdentifier=JavaSample, willTopic=null, willMessage=null, userName=null, password=null]]]

Usually, the mqtt connect message will contain information such as qos, user name, password, etc. Since we did not carry the user name and password when we started the client, all obtained here are null. We will not verify these messages first, and send them directly to the client. A connack message is returned, indicating that the connection is successful

 final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build();
        ctx.channel().writeAndFlush(ackMessage);

We run the Server and Client again, and then we can see that we have gone through the Connect stage and entered the publish message process, and then we will implement more other scenarios

Attach the MqttHandler code for this stage

package com.github.shoothzj.mqtt;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import lombok.extern.slf4j.Slf4j;

import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;

@Slf4j
public class MqttHandler extends ChannelInboundHandlerAdapter {

    @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 super.channelRead(ctx, msg);
 if (msg instanceof MqttConnectMessage) {
            handleConnect(ctx, (MqttConnectMessage) msg);
        } else {
            log.error("Unsupported type msg [{}]", msg);
        }
    }

 private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {
        log.info("connect msg is [{}]", connectMessage);
        final MqttFixedHeader fixedHeader = connectMessage.fixedHeader();
        final MqttConnectVariableHeader variableHeader = connectMessage.variableHeader();
        final MqttConnectPayload connectPayload = connectMessage.payload();
        final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build();
        ctx.channel().writeAndFlush(ackMessage);
    }

}

We currently put all the logic in MqttHandler, which is inconvenient for subsequent expansion. An MqttProcessor interface is abstracted to process specific requests, and MqttHandler is responsible for parsing the type of MqttMessage and distributing it. The MqttProcess interface design is as follows

package io.github.protocol.mqtt.broker.processor;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;

public interface MqttProcessor {

    void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception;

    void processConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg) throws Exception;

    void processPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) throws Exception;

    void processPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception;

    void processPubRec(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;

    void processPubRel(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;

    void processPubComp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;

    void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) throws Exception;

    void processSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg) throws Exception;

    void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception;

    void processUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg) throws Exception;

    void processPingReq(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;

    void processPingResp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;

    void processDisconnect(ChannelHandlerContext ctx) throws Exception;

    void processAuth(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;

}

We allow these methods to throw exceptions. When encountering extremely difficult failures, we will disconnect the mqtt connection (such as backend storage failures) and wait for the client to reconnect.

To call MqttProcessor in MqttHandler, the related MqttHandler code is as follows

      Preconditions.checkArgument(message instanceof MqttMessage);
        MqttMessage msg = (MqttMessage) message;
 try {
 if (msg.decoderResult().isFailure()) {
                Throwable cause = msg.decoderResult().cause();
 if (cause instanceof MqttUnacceptableProtocolVersionException) {
 // Unsupported protocol version
                    MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
 new MqttFixedHeader(MqttMessageType.CONNACK,
 false, MqttQoS.AT_MOST_ONCE, false, 0),
 new MqttConnAckVariableHeader(
                                    MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION,
 false), null);
                    ctx.writeAndFlush(connAckMessage);
                    log.error("connection refused due to invalid protocol, client address [{}]",
                            ctx.channel().remoteAddress());
                    ctx.close();
 return;
                } else if (cause instanceof MqttIdentifierRejectedException) {
 // ineligible clientId
                    MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
 new MqttFixedHeader(MqttMessageType.CONNACK,
 false, MqttQoS.AT_MOST_ONCE, false, 0),
 new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED,
 false), null);
                    ctx.writeAndFlush(connAckMessage);
                    log.error("ineligible clientId, client address [{}]", ctx.channel().remoteAddress());
                    ctx.close();
 return;
                }
 throw new IllegalStateException(msg.decoderResult().cause().getMessage());
            }
            MqttMessageType messageType = msg.fixedHeader().messageType();
 if (log.isDebugEnabled()) {
                log.debug("Processing MQTT Inbound handler message, type={}", messageType);
            }
 switch (messageType) {
 case CONNECT:
                    Preconditions.checkArgument(msg instanceof MqttConnectMessage);
                    processor.processConnect(ctx, (MqttConnectMessage) msg);
 break;
 case CONNACK:
                    Preconditions.checkArgument(msg instanceof MqttConnAckMessage);
                    processor.processConnAck(ctx, (MqttConnAckMessage) msg);
 break;
 case PUBLISH:
                    Preconditions.checkArgument(msg instanceof MqttPublishMessage);
                    processor.processPublish(ctx, (MqttPublishMessage) msg);
 break;
 case PUBACK:
                    Preconditions.checkArgument(msg instanceof MqttPubAckMessage);
                    processor.processPubAck(ctx, (MqttPubAckMessage) msg);
 break;
 case PUBREC:
                    processor.processPubRec(ctx, msg);
 break;
 case PUBREL:
                    processor.processPubRel(ctx, msg);
 break;
 case PUBCOMP:
                    processor.processPubComp(ctx, msg);
 break;
 case SUBSCRIBE:
                    Preconditions.checkArgument(msg instanceof MqttSubscribeMessage);
                    processor.processSubscribe(ctx, (MqttSubscribeMessage) msg);
 break;
 case SUBACK:
                    Preconditions.checkArgument(msg instanceof MqttSubAckMessage);
                    processor.processSubAck(ctx, (MqttSubAckMessage) msg);
 break;
 case UNSUBSCRIBE:
                    Preconditions.checkArgument(msg instanceof MqttUnsubscribeMessage);
                    processor.processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
 break;
 case UNSUBACK:
                    Preconditions.checkArgument(msg instanceof MqttUnsubAckMessage);
                    processor.processUnsubAck(ctx, (MqttUnsubAckMessage) msg);
 break;
 case PINGREQ:
                    processor.processPingReq(ctx, msg);
 break;
 case PINGRESP:
                    processor.processPingResp(ctx, msg);
 break;
 case DISCONNECT:
                    processor.processDisconnect(ctx);
 break;
 case AUTH:
                    processor.processAuth(ctx, msg);
 break;
 default:
 throw new UnsupportedOperationException("Unknown MessageType: " + messageType);
            }
        } catch (Throwable ex) {
            ReferenceCountUtil.safeRelease(msg);
            log.error("Exception was caught while processing MQTT message, ", ex);
            ctx.close();
        }

The code here is mainly for different types of MqttMessage, different methods of calling MqttProcessor, there are two points worth mentioning

  • Judging some decoding exceptions in advance, fast fail
  • Globally capture exceptions and handle broken links

Maintain MqttSession

Maintain the session of the Mqtt session, which is mainly used to continuously track the client session information, track the resources occupied in the system, etc. Considering that no matter what kind of back-end implementation, it is necessary to maintain the Mqtt Session, we build an AbstractMqttProcessor to maintain the MqttSession

package io.github.protocol.mqtt.broker.processor;

import io.github.protocol.mqtt.broker.MqttSessionKey;
import io.github.protocol.mqtt.broker.auth.MqttAuth;
import io.github.protocol.mqtt.broker.util.ChannelUtils;
import io.github.protocol.mqtt.broker.util.MqttMessageUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.util.stream.IntStream;

@Slf4j
public abstract class AbstractProcessor implements MqttProcessor {

 protected final MqttAuth mqttAuth;

 public AbstractProcessor(MqttAuth mqttAuth) {
 this.mqttAuth = mqttAuth;
    }

    @Override
 public void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception {
        String clientId = msg.payload().clientIdentifier();
        String username = msg.payload().userName();
        byte[] pwd = msg.payload().passwordInBytes();
 if (StringUtils.isBlank(clientId) || StringUtils.isBlank(username)) {
            MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
 new MqttFixedHeader(MqttMessageType.CONNACK,
 false, MqttQoS.AT_MOST_ONCE, false, 0),
 new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED,
 false), null);
            ctx.writeAndFlush(connAckMessage);
            log.error("the clientId username pwd cannot be empty, client address[{}]", ctx.channel().remoteAddress());
            ctx.close();
 return;
        }
 if (!mqttAuth.connAuth(clientId, username, pwd)) {
            MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
 new MqttFixedHeader(MqttMessageType.CONNACK,
 false, MqttQoS.AT_MOST_ONCE, false, 0),
 new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD,
 false), null);
            ctx.writeAndFlush(connAckMessage);
            log.error("the clientId username pwd cannot be empty, client address[{}]", ctx.channel().remoteAddress());
            ctx.close();
 return;
        }

        MqttSessionKey mqttSessionKey = new MqttSessionKey();
        mqttSessionKey.setUsername(username);
        mqttSessionKey.setClientId(clientId);
        ChannelUtils.setMqttSession(ctx.channel(), mqttSessionKey);
        log.info("username {} clientId {} remote address {} connected",
                username, clientId, ctx.channel().remoteAddress());
        onConnect(mqttSessionKey);
        MqttConnAckMessage mqttConnectMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
 new MqttFixedHeader(MqttMessageType.CONNACK,
 false, MqttQoS.AT_MOST_ONCE, false, 0),
 new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false),
 null);
        ctx.writeAndFlush(mqttConnectMessage);
    }

 protected void onConnect(MqttSessionKey mqttSessionKey) {
    }

    @Override
 public void processConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg) throws Exception {
        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
 if (mqttSession == null) {
            log.error("conn ack, client address {} not authed", ctx.channel().remoteAddress());
            ctx.close();
        }
    }

    @Override
 public void processPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) throws Exception {
        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
 if (mqttSession == null) {
            log.error("publish, client address {} not authed", ctx.channel().remoteAddress());
            ctx.close();
 return;
        }
 if (msg.fixedHeader().qosLevel() == MqttQoS.FAILURE) {
            log.error("failure. clientId {}, username {} ", mqttSession.getClientId(), mqttSession.getUsername());
 return;
        }
 if (msg.fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) {
            log.error("does not support QoS2 protocol. clientId {}, username {} ",
                    mqttSession.getClientId(), mqttSession.getUsername());
 return;
        }
        onPublish(ctx, mqttSession, msg);
    }

 protected void onPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
                             MqttPublishMessage msg) throws Exception {
    }

    @Override
 public void processPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception {
        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
 if (mqttSession == null) {
            log.error("pub ack, client address {} not authed", ctx.channel().remoteAddress());
            ctx.close();
        }
    }

    @Override
 public void processPubRec(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
 if (mqttSession == null) {
            log.error("pub rec, client address {} not authed", ctx.channel().remoteAddress());
            ctx.close();
        }
    }

    @Override
 public void processPubRel(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
 if (mqttSession == null) {
            log.error("pub rel, client address {} not authed", ctx.channel().remoteAddress());
            ctx.close();
        }
    }

    @Override
 public void processPubComp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
 if (mqttSession == null) {
            log.error("pub comp, client address {} not authed", ctx.channel().remoteAddress());
            ctx.close();
        }
    }

    @Override
 public void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) throws Exception {
        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
 if (mqttSession == null) {
            log.error("sub, client address {} not authed", ctx.channel().remoteAddress());
            ctx.close();
        }
        onSubscribe(ctx, mqttSession, msg.payload());
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK,
 false, MqttQoS.AT_MOST_ONCE, false, 0);
        IntStream intStream = msg.payload().topicSubscriptions().stream().mapToInt(s -> s.qualityOfService().value());
        MqttSubAckPayload payload = new MqttSubAckPayload(intStream.toArray());
        ctx.writeAndFlush(MqttMessageFactory.newMessage(
                fixedHeader,
                MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()),
                payload));
    }

 protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
                               MqttSubscribePayload subscribePayload) throws Exception {
    }

    @Override
 public void processSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg) throws Exception {
        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
 if (mqttSession == null) {
            log.error("sub ack, client address {} not authed", ctx.channel().remoteAddress());
            ctx.close();
        }
    }

    @Override
 public void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception {
        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
 if (mqttSession == null) {
            log.error("unsub, client address {} not authed", ctx.channel().remoteAddress());
            ctx.close();
        }
    }

    @Override
 public void processUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg) throws Exception {
        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
 if (mqttSession == null) {
            log.error("unsub ack, client address {} not authed", ctx.channel().remoteAddress());
            ctx.close();
        }
    }

    @Override
 public void processPingReq(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
        ctx.writeAndFlush(MqttMessageUtil.pingResp());
    }

    @Override
 public void processPingResp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
 if (mqttSession == null) {
            log.error("ping resp, client address {} not authed", ctx.channel().remoteAddress());
            ctx.close();
        }
    }

    @Override
 public void processDisconnect(ChannelHandlerContext ctx) throws Exception {
        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
 if (mqttSession == null) {
            log.error("disconnect, client address {} not authed", ctx.channel().remoteAddress());
        }
        onDisconnect(mqttSession);
    }

 protected void onDisconnect(MqttSessionKey mqttSessionKey) {
    }

    @Override
 public void processAuth(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
        MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
 if (mqttSession == null) {
            log.error("auth, client address {} not authed", ctx.channel().remoteAddress());
            ctx.close();
        }
    }
}

It can be seen that the AbstractProcessor here mainly maintains the MqttSessionKey, verifies the MqttSessionKey, and intercepts Qos2 and Failure that are not supported in publish. At the same time, it also affects the mqtt heartbeat request. Similarly, we allow exceptions to be thrown in onPublish and onSubscribe.

The basic idea of ​​the mqtt gateway based on the message queue is also relatively simple. In short, when there is a publish message, the message is produced in the message queue. When there is a subscription, the message is pulled from the message queue. Extending from this, we may need to maintain the corresponding relationship between each mqtt topic and producer and consumer, because consumers of message middleware such as kafka and pulsar all distinguish topics, and the general code of the fragment is as follows:

protected final ReentrantReadWriteLock.ReadLock rLock;

 protected final ReentrantReadWriteLock.WriteLock wLock;

 protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionProducerMap;

 protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionConsumerMap;

 protected final Map<MqttTopicKey, P> producerMap;

 protected final Map<MqttTopicKey, C> consumerMap;

 public AbstractMqProcessor(MqttAuth mqttAuth) {
 super(mqttAuth);
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        rLock = lock.readLock();
        wLock = lock.writeLock();
 this.sessionProducerMap = new HashMap<>();
 this.sessionConsumerMap = new HashMap<>();
 this.producerMap = new HashMap<>();
 this.consumerMap = new HashMap<>();
    }

    @Override
 protected void onConnect(MqttSessionKey mqttSessionKey) {
        wLock.lock();
 try {
            sessionProducerMap.put(mqttSessionKey, new ArrayList<>());
            sessionConsumerMap.put(mqttSessionKey, new ArrayList<>());
        } finally {
            wLock.unlock();
        }
    }

    @Override
 protected void onDisconnect(MqttSessionKey mqttSessionKey) {
        wLock.lock();
 try {
 // find producers
            List<MqttTopicKey> produceTopicKeys = sessionProducerMap.get(mqttSessionKey);
 if (produceTopicKeys != null) {
 for (MqttTopicKey mqttTopicKey : produceTopicKeys) {
                    P producer = producerMap.get(mqttTopicKey);
 if (producer != null) {
                        ClosableUtils.close(producer);
                        producerMap.remove(mqttTopicKey);
                    }
                }
            }
            sessionProducerMap.remove(mqttSessionKey);
            List<MqttTopicKey> consumeTopicKeys = sessionConsumerMap.get(mqttSessionKey);
 if (consumeTopicKeys != null) {
 for (MqttTopicKey mqttTopicKey : consumeTopicKeys) {
                    C consumer = consumerMap.get(mqttTopicKey);
 if (consumer != null) {
                        ClosableUtils.close(consumer);
                        consumerMap.remove(mqttTopicKey);
                    }
                }
            }
            sessionConsumerMap.remove(mqttSessionKey);
        } finally {
            wLock.unlock();
        }
    }
}

kafka processor implementation

because kafka producer Does not distinguish between topic,we can in kafka processor Multiplexing producer,single in the future kafka producer When the performance of , we can set the kafka producer expands to kafka producer The list is polled for processing, and consumers due to mqtt Agreements may be per subscription topic There are different behaviors and it is not suitable to reuse the same consumer instance. We start in the constructor KafkaProducer

 private final KafkaProcessorConfig kafkaProcessorConfig;

 private final KafkaProducer<String, ByteBuffer> producer;

 public KafkaProcessor(MqttAuth mqttAuth, KafkaProcessorConfig kafkaProcessorConfig) {
 super(mqttAuth);
 this.kafkaProcessorConfig = kafkaProcessorConfig;
 this.producer = createProducer();
    }

 protected KafkaProducer<String, ByteBuffer> createProducer() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProcessorConfig.getBootstrapServers());
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer.class);
 return new KafkaProducer<>(properties);
    }

Process the MqttPublish message, the MqttPublish message contains the following key parameters

MqttQoS mqttQoS = publishMessage.fixedHeader().qosLevel();
String topic = publishMessage.variableHeader().topicName();
ByteBuffer byteBuffer = publishMessage.payload().nioBuffer();

in

  • qos represents the quality level of this message, 0 means no guarantee, 1 means at least once, and 2 means exactly once. Currently only supports qos0, qos1
  • topicName is the name of the topic
  • ByteBuffer is the content of the message

Send messages according to topic and qos, the code is as follows

  String topic = msg.variableHeader().topicName();
        ProducerRecord<String, ByteBuffer> record = new ProducerRecord<>(topic, msg.payload().nioBuffer());
 switch (msg.fixedHeader().qosLevel()) {
 case AT_MOST_ONCE -> producer.send(record, (metadata, exception) -> {
 if (exception != null) {
                    log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey, exception);
 return;
                }
                log.debug("mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}",
                        mqttSessionKey, metadata.topic(), metadata.partition(), metadata.offset());
            });
 case AT_LEAST_ONCE -> {
 try {
                    RecordMetadata recordMetadata = producer.send(record).get();
                    log.info("mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}",
                            mqttSessionKey, recordMetadata.topic(),
                            recordMetadata.partition(), recordMetadata.offset());
                    ctx.writeAndFlush(MqttMessageUtil.pubAckMessage(msg.variableHeader().packetId()));
                } catch (Exception e) {
                    log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey, e);
                }
            }
 case EXACTLY_ONCE, FAILURE -> throw new IllegalStateException(
                    String.format("mqttSessionKey %s can not reach here", mqttSessionKey));
        }

To process subscription messages, we only need to create a topic for consumption based on the subscribed topic for the time being. Since the consumption code mode suggested by the kafka native client is as follows

while (true) {
  ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(1));
 for (ConsumerRecord<String, byte[]> record : records) {
 // do logic
  }
}

We need to switch to other threads to message the consumer, write a KafkaConsumerListenerWrapper wrapper, and convert it to the listener asynchronous consumption model

package io.github.protocol.mqtt.broker.processor;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

@Slf4j
public class KafkaConsumerListenerWrapper implements AutoCloseable {

 private final AdminClient adminClient;

 private final KafkaConsumer<String, byte[]> consumer;

 public KafkaConsumerListenerWrapper(KafkaProcessorConfig config, String username) {
        Properties adminProperties = new Properties();
        adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
 this.adminClient = KafkaAdminClient.create(adminProperties);
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, username);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
 this.consumer = new KafkaConsumer<>(properties);
    }

 public void start(String topic, KafkaMessageListener listener) throws Exception {
 try {
            TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(topic))
                    .values().get(topic).get();
            log.info("topic info is {}", topicDescription);
        } catch (ExecutionException ee) {
 if (ee.getCause() instanceof UnknownTopicOrPartitionException) {
                log.info("topic {} not exist, create it", topic);
                adminClient.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1)));
            } else {
                log.error("find topic info {} error", topic, ee);
            }
        } catch (Exception e) {
 throw new IllegalStateException("find topic info error", e);
        }
        consumer.subscribe(Collections.singletonList(topic));
        log.info("consumer topic {} start", topic);
 new Thread(() -> {
 try {
 while (true) {
                    ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(1));
 for (ConsumerRecord<String, byte[]> record : records) {
                        listener.messageReceived(record);
                    }
                }
            } catch (WakeupException we) {
                consumer.close();
            } catch (Exception e) {
                log.error("consumer topic {} consume error", topic, e);
                consumer.close();
            }
        }).start();
        Thread.sleep(5_000);
    }

    @Override
 public void close() throws Exception {
        log.info("wake up {} consumer", consumer);
        consumer.wakeup();
    }
}
    @Override
 protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
                               MqttSubscribePayload subscribePayload) throws Exception {
 for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) {
            KafkaConsumerListenerWrapper consumer = createConsumer(mqttSessionKey, topicSubscription.topicName());
            subscribe(ctx, consumer, topicSubscription.topicName());
        }
    }

 private KafkaConsumerListenerWrapper createConsumer(MqttSessionKey mqttSessionKey, String topic) {
        MqttTopicKey mqttTopicKey = new MqttTopicKey();
        mqttTopicKey.setTopic(topic);
        mqttTopicKey.setMqttSessionKey(mqttSessionKey);

        wLock.lock();
 try {
            KafkaConsumerListenerWrapper consumer = consumerMap.get(mqttTopicKey);
 if (consumer == null) {
                consumer = new KafkaConsumerListenerWrapper(kafkaProcessorConfig, mqttSessionKey.getUsername());
                sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {
 if (mqttTopicKeys == null) {
                        mqttTopicKeys = new ArrayList<>();
                    }
                    mqttTopicKeys.add(mqttTopicKey);
 return mqttTopicKeys;
                });
                consumerMap.put(mqttTopicKey, consumer);
            }
 return consumer;
        } finally {
            wLock.unlock();
        }
    }

 protected void subscribe(ChannelHandlerContext ctx,
                             KafkaConsumerListenerWrapper consumer, String topic) throws Exception {
        BoundInt boundInt = new BoundInt(65535);
        consumer.start(topic, record -> {
            log.info("receive message from kafka, topic {}, partition {}, offset {}",
                    record.topic(), record.partition(), record.offset());
            MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage(
                    MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), record.value());
            ctx.writeAndFlush(mqttPublishMessage);
        });
    }

In the above code, there is a point that needs to be paid attention to throughout: when printing the log, pay attention to carry the key information, such as: topic, mqtt username, mqtt clientId, etc. I don’t feel it when writing the demo, but there are a lot of requests When you need to locate the problem, you will know the key points of this information.

Use BountInt, a simple tool class, to generate packageId from 0 to 65535 to meet the requirements of the protocol

pulsar processor implementation

Compared with kafka, pulsar is more suitable as a proxy for mqtt protocol. The reasons are as follows:

  • pulsar supports millions of topics, and topics are lighter
  • pulsar natively supports the consumption mode of listener, and does not require each consumer to start a thread
  • pulsar supports the consumption mode of share, which is more flexible
  • The subscribe of the pulsar consumer can ensure the successful creation of the subscription. Compared with the consumer of kafka, there is no such semantic guarantee
 protected final ReentrantReadWriteLock.ReadLock rLock;

 protected final ReentrantReadWriteLock.WriteLock wLock;

 protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionProducerMap;

 protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionConsumerMap;

 protected final Map<MqttTopicKey, Producer<byte[]>> producerMap;

 protected final Map<MqttTopicKey, Consumer<byte[]>> consumerMap;

 private final PulsarProcessorConfig pulsarProcessorConfig;

 private final PulsarAdmin pulsarAdmin;

 private final PulsarClient pulsarClient;

 public PulsarProcessor(MqttAuth mqttAuth, PulsarProcessorConfig pulsarProcessorConfig) {
 super(mqttAuth);
        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        rLock = lock.readLock();
        wLock = lock.writeLock();
 this.sessionProducerMap = new HashMap<>();
 this.sessionConsumerMap = new HashMap<>();
 this.producerMap = new HashMap<>();
 this.consumerMap = new HashMap<>();
 this.pulsarProcessorConfig = pulsarProcessorConfig;
 try {
 this.pulsarAdmin = PulsarAdmin.builder()
                    .serviceHttpUrl(pulsarProcessorConfig.getHttpUrl())
                    .build();
 this.pulsarClient = PulsarClient.builder()
                    .serviceUrl(pulsarProcessorConfig.getServiceUrl())
                    .build();
        } catch (Exception e) {
 throw new IllegalStateException("Failed to create pulsar client", e);
        }
    }

Handle the publish message

    @Override
 protected void onPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
                             MqttPublishMessage msg) throws Exception {
        String topic = msg.variableHeader().topicName();
        Producer<byte[]> producer = getOrCreateProducer(mqttSessionKey, topic);
        int len = msg.payload().readableBytes();
        byte[] messageBytes = new byte[len];
        msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
 switch (msg.fixedHeader().qosLevel()) {
 case AT_MOST_ONCE -> producer.sendAsync(messageBytes).
                    thenAccept(messageId -> log.info("clientId [{}],"
                                    + " username [{}]. send message to pulsar success messageId: {}",
                            mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId))
                    .exceptionally((e) -> {
                        log.error("clientId [{}], username [{}]. send message to pulsar fail: ",
                                mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e);
 return null;
                    });
 case AT_LEAST_ONCE -> {
 try {
                    MessageId messageId = producer.send(messageBytes);
                    MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK,
 false, MqttQoS.AT_MOST_ONCE, false, 0);
                    MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageFactory.newMessage(fixedHeader,
                            MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()), null);
                    log.info("clientId [{}], username [{}]. send pulsar success. messageId: {}",
                            mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId);
                    ctx.writeAndFlush(pubAckMessage);
                } catch (PulsarClientException e) {
                    log.error("clientId [{}], username [{}]. send pulsar error: {}",
                            mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e.getMessage());
                }
            }
 case EXACTLY_ONCE, FAILURE -> throw new IllegalStateException(
                    String.format("mqttSessionKey %s can not reach here", mqttSessionKey));
        }
    }

 private Producer<byte[]> getOrCreateProducer(MqttSessionKey mqttSessionKey, String topic) throws Exception {
        MqttTopicKey mqttTopicKey = new MqttTopicKey();
        mqttTopicKey.setTopic(topic);
        mqttTopicKey.setMqttSessionKey(mqttSessionKey);

        rLock.lock();
 try {
            Producer<byte[]> producer = producerMap.get(mqttTopicKey);
 if (producer != null) {
 return producer;
            }
        } finally {
            rLock.unlock();
        }

        wLock.lock();
 try {
            Producer<byte[]> producer = producerMap.get(mqttTopicKey);
 if (producer == null) {
                producer = createProducer(topic);
                sessionProducerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {
 if (mqttTopicKeys == null) {
                        mqttTopicKeys = new ArrayList<>();
                    }
                    mqttTopicKeys.add(mqttTopicKey);
 return mqttTopicKeys;
                });
                producerMap.put(mqttTopicKey, producer);
            }
 return producer;
        } finally {
            wLock.unlock();
        }
    }

 protected Producer<byte[]> createProducer(String topic) throws Exception {
 return pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
    }

Handle the subscribe message

    @Override
 protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
                               MqttSubscribePayload subscribePayload) throws Exception {
 for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) {
            subscribe(ctx, mqttSessionKey, topicSubscription.topicName());
        }
    }

 protected void subscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
                             String topic) throws Exception {
        MqttTopicKey mqttTopicKey = new MqttTopicKey();
        mqttTopicKey.setTopic(topic);
        mqttTopicKey.setMqttSessionKey(mqttSessionKey);

        wLock.lock();
 try {
            Consumer<byte[]> consumer = consumerMap.get(mqttTopicKey);
 if (consumer == null) {
                consumer = createConsumer(ctx, mqttSessionKey.getUsername(), topic);
                sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {
 if (mqttTopicKeys == null) {
                        mqttTopicKeys = new ArrayList<>();
                    }
                    mqttTopicKeys.add(mqttTopicKey);
 return mqttTopicKeys;
                });
                consumerMap.put(mqttTopicKey, consumer);
            }
        } finally {
            wLock.unlock();
        }
    }

 protected Consumer<byte[]> createConsumer(ChannelHandlerContext ctx, String username,
                                              String topic) throws Exception {
        BoundInt boundInt = new BoundInt(65535);
 try {
            PartitionedTopicStats partitionedStats = pulsarAdmin.topics().getPartitionedStats(topic, false);
            log.info("topic {} partitioned stats {}", topic, partitionedStats);
        } catch (PulsarAdminException.NotFoundException nfe) {
            log.info("topic {} not found", topic);
            pulsarAdmin.topics().createPartitionedTopic(topic, 1);
        }
 return pulsarClient.newConsumer(Schema.BYTES).topic(topic)
                .messageListener((consumer, msg) -> {
                    log.info("receive message from pulsar, topic {}, message {}", topic, msg.getMessageId());
                    MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage(
                            MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), msg.getData());
                    ctx.writeAndFlush(mqttPublishMessage);
                })
                .subscriptionName(username).subscribe();
    }

Integration Testing

kafka

We can start the kafka broker for unit testing through the embedded-kafka-java project. Introduce dependencies through the following group

 <dependency>
 <groupId>io.github.embedded-middleware</groupId>
 <artifactId>embedded-kafka-core</artifactId>
 <version>0.0.2</version>
 <scope>test</scope>
 </dependency>

We can start the kafka-based mqtt broker through the following code

@Slf4j
public class MqttKafkaTestUtil {

 public static MqttServer setupMqttKafka() throws Exception {
        EmbeddedKafkaServer embeddedKafkaServer = new EmbeddedKafkaServer();
 new Thread(() -> {
 try {
                embeddedKafkaServer.start();
            } catch (Exception e) {
                log.error("kafka broker started exception ", e);
            }
        }).start();
        Thread.sleep(5_000);
        MqttServerConfig mqttServerConfig = new MqttServerConfig();
        mqttServerConfig.setPort(0);
        mqttServerConfig.setProcessorType(ProcessorType.KAFKA);
        KafkaProcessorConfig kafkaProcessorConfig = new KafkaProcessorConfig();
        kafkaProcessorConfig.setBootstrapServers(String.format("localhost:%d", embeddedKafkaServer.getKafkaPort()));
        mqttServerConfig.setKafkaProcessorConfig(kafkaProcessorConfig);
        MqttServer mqttServer = new MqttServer(mqttServerConfig);
 new Thread(() -> {
 try {
                mqttServer.start();
            } catch (Exception e) {
                log.error("mqsar broker started exception ", e);
            }
        }).start();
        Thread.sleep(5000L);
 return mqttServer;
    }

}

kafka end-to-end test case is relatively simple, publish a message through mqtt client, and then consume it

@Log4j2
public class MqttKafkaPubSubTest {

    @Test
 public void pubSubTest() throws Exception {
        MqttServer mqttServer = MqttKafkaTestUtil.setupMqttKafka();
        String topic = UUID.randomUUID().toString();
        String content = "test-msg";
        String broker = String.format("tcp://localhost:%d", mqttServer.getPort());
        String clientId = UUID.randomUUID().toString();
        MemoryPersistence persistence = new MemoryPersistence();
        MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setUserName(UUID.randomUUID().toString());
        connOpts.setPassword(UUID.randomUUID().toString().toCharArray());
        connOpts.setCleanSession(true);
        log.info("Mqtt connecting to broker");
        sampleClient.connect(connOpts);
        CompletableFuture<String> future = new CompletableFuture<>();
        log.info("Mqtt subscribing");
        sampleClient.subscribe(topic, (s, mqttMessage) -> {
            log.info("messageArrived");
            future.complete(mqttMessage.toString());
        });
        log.info("Mqtt subscribed");
        MqttMessage message = new MqttMessage(content.getBytes());
        message.setQos(1);
        log.info("Mqtt message publishing");
        sampleClient.publish(topic, message);
        log.info("Mqtt message published");
        TimeUnit.SECONDS.sleep(3);
        sampleClient.disconnect();
        String msg = future.get(5, TimeUnit.SECONDS);
        Assertions.assertEquals(content, msg);
    }

}

pulsar

We can start the pulsar broker for unit testing through the embedded-pulsar-java project. Introduce dependencies through the following group

 <dependency>
 <groupId>io.github.embedded-middleware</groupId>
 <artifactId>embedded-pulsar-core</artifactId>
 <version>0.0.2</version>
 <scope>test</scope>
 </dependency>

We can start the mqtt broker based on pulsar through the following code

@Slf4j
public class MqttPulsarTestUtil {

 public static MqttServer setupMqttPulsar() throws Exception {
        EmbeddedPulsarServer embeddedPulsarServer = new EmbeddedPulsarServer();
        embeddedPulsarServer.start();
        MqttServerConfig mqttServerConfig = new MqttServerConfig();
        mqttServerConfig.setPort(0);
        mqttServerConfig.setProcessorType(ProcessorType.PULSAR);
        PulsarProcessorConfig pulsarProcessorConfig = new PulsarProcessorConfig();
        pulsarProcessorConfig.setHttpUrl(String.format("http://localhost:%d", embeddedPulsarServer.getWebPort()));
        pulsarProcessorConfig.setServiceUrl(String.format("pulsar://localhost:%d", embeddedPulsarServer.getTcpPort()));
        mqttServerConfig.setPulsarProcessorConfig(pulsarProcessorConfig);
        MqttServer mqttServer = new MqttServer(mqttServerConfig);
 new Thread(() -> {
 try {
                mqttServer.start();
            } catch (Exception e) {
                log.error("mqsar broker started exception ", e);
            }
        }).start();
        Thread.sleep(5000L);
 return mqttServer;
    }
}

The pulsar end-to-end test case is relatively simple, publish a message through the mqtt client, and then consume it

@Log4j2
public class MqttPulsarPubSubTest {

    @Test
 public void pubSubTest() throws Exception {
        MqttServer mqttServer = MqttPulsarTestUtil.setupMqttPulsar();
        String topic = UUID.randomUUID().toString();
        String content = "test-msg";
        String broker = String.format("tcp://localhost:%d", mqttServer.getPort());
        String clientId = UUID.randomUUID().toString();
        MemoryPersistence persistence = new MemoryPersistence();
        MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setUserName(UUID.randomUUID().toString());
        connOpts.setPassword(UUID.randomUUID().toString().toCharArray());
        connOpts.setCleanSession(true);
        log.info("Mqtt connecting to broker");
        sampleClient.connect(connOpts);
        CompletableFuture<String> future = new CompletableFuture<>();
        log.info("Mqtt subscribing");
        sampleClient.subscribe(topic, (s, mqttMessage) -> {
            log.info("messageArrived");
            future.complete(mqttMessage.toString());
        });
        log.info("Mqtt subscribed");
        MqttMessage message = new MqttMessage(content.getBytes());
        message.setQos(1);
        log.info("Mqtt message publishing");
        sampleClient.publish(topic, message);
        log.info("Mqtt message published");
        TimeUnit.SECONDS.sleep(3);
        sampleClient.disconnect();
        String msg = future.get(5, TimeUnit.SECONDS);
        Assertions.assertEquals(content, msg);
    }
}

performance optimization

Here we briefly describe several performance optimization points, such as adjusting the number of threads, buffer size and other parameter adjustments, which will not be described in detail here. These require specific performance pressure tests to determine parameter settings.

Using the Epoll network model on linux

public class EventLoopUtil {

 /**
     * @return an EventLoopGroup suitable for the current platform
 */
 public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
 if (Epoll.isAvailable()) {
 return new EpollEventLoopGroup(nThreads, threadFactory);
        } else {
 return new NioEventLoopGroup(nThreads, threadFactory);
        }
    }

 public static Class<? extends ServerSocketChannel> getServerSocketChannelClass(EventLoopGroup eventLoopGroup) {
 if (eventLoopGroup instanceof EpollEventLoopGroup) {
 return EpollServerSocketChannel.class;
        } else {
 return NioServerSocketChannel.class;
        }
    }

}

Through Epollo.isAvailable, and when specifying the channel type, select the corresponding channel type by judging the type of the group

        EventLoopGroup acceptorGroup = EventLoopUtil.newEventLoopGroup(1,
 new DefaultThreadFactory("mqtt-acceptor"));
        EventLoopGroup workerGroup = EventLoopUtil.newEventLoopGroup(1,
 new DefaultThreadFactory("mqtt-worker"));
                b.group(acceptorGroup, workerGroup)
 // key point
                    .channel(EventLoopUtil.getServerSocketChannelClass(workerGroup))
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
 public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
 // decoder
                            p.addLast(new MqttDecoder());
                            p.addLast(MqttEncoder.INSTANCE);
                            p.addLast(new MqttHandler(processor(mqttServerConfig)));
                        }
                    });

close tcp keepalive

Since the mqtt protocol itself has a heartbeat mechanism, you can turn off the keepalive of tcp and rely on the heartbeat of the mqtt protocol layer to save performance under massive connections. Configure ChannelOption.SO_KEEPALIVE to false

                    .option(ChannelOption.SO_KEEPALIVE, false)

Shorten the timeout

By default, whether it is mqtt in the unit test, or the production timeout of pulsar producer and kafka producer, it is relatively long (generally 30s). If it is deployed in an internal network environment, the timeout period can be adjusted to 5s. To avoid meaningless timeout waiting

Use multiple KafkaProducer s to optimize performance

A single KafkaProducer will reach the bottleneck of the tcp link bandwidth. When there are massive requests and the delay is prominent in kafka production, consider starting multiple KafkaProducers. And according to the characteristics of the mqtt protocol (multiple links, qps on a single link is not high), use the hash value of mqttSessionKey to determine which KafkaProducer to send messages

Add the following configuration in KafkaProcessorConfig, the number of producers, the default is 1

 private int producerNum = 1;

At initialization time, initialize the Producer array instead of a single Producer

 this.producerArray = new KafkaProducer[kafkaProcessorConfig.getProducerNum()];
 for (int i = 0; i < kafkaProcessorConfig.getProducerNum(); i++) {
            producerArray[i] = createProducer();
        }

Encapsulate a method to get producer

 private Producer<String, ByteBuffer> getProducer(MqttSessionKey mqttSessionKey) {
 return producerArray[Math.abs(mqttSessionKey.hashCode() % kafkaProcessorConfig.getProducerNum())];
    }

epilogue

The code for this article has been uploaded to github. We have only implemented the basic mqtt connection, publishing, and subscription functions here, and even do not support pausing and unsubscribing. To realize a mature commercial mqtt gateway, we also need user isolation, more support for protocols, reliability, operability, flow control, security and other capabilities. If you have commercial production-level mqtt requirements and cannot quickly build a mature mqtt gateway, you can choose HUAWEI CLOUD IoTDA Service , providing stable and reliable mqtt services, supporting massive devices connected to the cloud, and two-way communication capabilities between devices and cloud messages.

 

Click to follow and learn about Huawei Cloud's fresh technologies for the first time~

Posted by LikPan on Fri, 03 Feb 2023 14:17:59 +1030