Introduction
In this article, I’ll share my journey of implementing an MQTT 5.0 client in Go. We’ll cover protocol fundamentals, connection and session management, and, of course, publishing and receiving messages to and from a broker.
Why build an MQTT client when libraries like Paho already exist? Because I enjoy writing clients, it’s a playground for fun concepts like protocol parsing, networking, concurrency, and performance tuning.
- github repo: https://github.com/MonsieurTib/gmqtt
- MQTT 5.0 specifications : https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html
Part I : Connecting to the Broker
1. Understanding MQTT 5 Data Types
MQTT is a s…
Introduction
In this article, I’ll share my journey of implementing an MQTT 5.0 client in Go. We’ll cover protocol fundamentals, connection and session management, and, of course, publishing and receiving messages to and from a broker.
Why build an MQTT client when libraries like Paho already exist? Because I enjoy writing clients, it’s a playground for fun concepts like protocol parsing, networking, concurrency, and performance tuning.
- github repo: https://github.com/MonsieurTib/gmqtt
- MQTT 5.0 specifications : https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html
Part I : Connecting to the Broker
1. Understanding MQTT 5 Data Types
MQTT is a standard messaging protocol for IoT. It’s designed as a lightweight pub/sub messaging transport for connecting remote device with small footprint and low bandwidth usage.
MQTT 5 introduces several data types that are important for building a client from scratch:
- 2 Byte Integer
- 4 Byte Integer
- Variable Byte Integer
- Binary Data
- UTF-8 Encoded String
- UTF-8 String pair
Implementing these correctly is the first step toward building a functional MQTT client in Go.
I implemented the following functions, which will be used for encoding and decoding MQTT packets.
1.a - 2 Byte Integer
func encodeUint16(buff *bytes.Buffer, n uint16) error {
_, err := buff.Write([]byte{byte(n >> 8), byte(n)})
return err
}
func decodeUint16(buf *bytes.Buffer) (uint16, error) {
b := buf.Next(2)
if len(b) < 2 {
return 0, fmt.Errorf("not enough bytes")
}
return uint16(b[0])<<8 | uint16(b[1]), nil
}
1.b - 4 Byte Integer
func encodeUint32(buff *bytes.Buffer, n uint32) error {
_, err := buff.Write([]byte{
byte(n >> 24),
byte(n >> 16),
byte(n >> 8),
byte(n),
})
return err
}
func decodeUint32(buf *bytes.Buffer) (uint32, error) {
b := buf.Next(4)
if len(b) < 4 {
return 0, fmt.Errorf("not enough bytes")
}
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil
}
1.c - Variable Byte Integer
This one is less straightforward, and from the specifications :
“The Variable Byte Integer is encoded using an encoding scheme which uses a single byte for values up to 127. Larger values are handled as follows. The least significant seven bits of each byte encode the data, and the most significant bit is used to indicate whether there are bytes following in the representation. Thus, each byte encodes 128 values and a “continuation bit”. The maximum number of bytes in the Variable Byte Integer field is four. The encoded value MUST use the minimum number of bytes necessary to represent the value“
It’s an efficient way to represent values from 0 to 268,435,455 while using as few bytes as possible.
For example, to represent a value between 0 and 127, you only need one byte. For values between 128 and 16,383, you need two bytes. And so on, up to four bytes for the maximum range.
Here’s a table to illustrate:
| Digits | From | To |
|---|---|---|
| 1 | 0 (0x00) | 127 (0x7F) |
| 2 | 128 (0x80, 0x01) | 16,383 (0xFF, 0x7F) |
| 3 | 16,384 (0x80, 0x80, 0x01) | 2,097,151 (0xFF, 0xFF, 0x7F) |
| 4 | 2,097,152 (0x80, 0x80, 0x80, 0x01) | 268,435,455 (0xFF, 0xFF, 0xFF, 0x7F) |
Lucky us, the specifications provide the following pseudo code :-) :
do
encodedByte = X MOD 128
X = X DIV 128
// if there are more data to encode, set the top bit of this byte
if (X > 0)
encodedByte = encodedByte OR 128
endif
'output' encodedByte
while (X > 0)
Where MOD is the modulo operator (% in C), DIV is integer division (/ in C), and OR is bit-wise or (| in C).
which can be easily translated into the following go code :
func encodeVariableByteInteger(buf *bytes.Buffer, length int) {
for {
encodedByte := byte(length % 128)
length /= 128
if length > 0 {
encodedByte |= 0x80
}
buf.WriteByte(encodedByte)
if length <= 0 {
break
}
}
}
Explanations This function encodes an integer into Variable Byte Integer format and writes it to a buffer. It repeatedly takes the value modulo 128 to extract the lower 7 bits, sets the continuation bit (0x80) if more bytes are needed, and writes each byte to the buffer. The loop continues until the remaining length is zero.
Some examples as encoded values :
value : 127
Binary: 0111 1111
└─ continuation bit = 0 (no more bytes to follow)
Since 127 fits in 7 bits, only one byte is needed, continuation bit is 0
value :349
binary :1101 1101 | 0000 0010
| └─ continuation bit = 0 (no more bytes),
└─ continuation bit = 1 (more bytes follow),
The following function is used to decode variable byte integer
func decodeVariableByteInteger(r io.Reader) (int, error) {
value := 0
multiplier := 1
buf := make([]byte, 1)
for {
_, err := r.Read(buf)
if err != nil {
return 0, err
}
value += int(buf[0]&0x7F) * multiplier
if multiplier > 128*128*128 {
return 0, errors.New("malformed Variable Byte Integer (too long)")
}
if (buf[0] & 128) == 0 {
break
}
multiplier *= 128
}
return value, nil
}
Explanations We read a Variable Byte Integer from an io.Reader by decoding one byte at a time. Each byte contributes 7 bits of data, and the most significant bit (MSB) acts as a continuation flag: if it’s set to 1, another byte follows, if it’s 0, the value is complete.
On each iteration, the function:
- Masks out the MSB (buf[0] & 0x7F) to get the data bits.
- Adds them to the accumulated value, scaled by a multiplier (1, 128, 128², 128³).
- Checks if the MSB is not 1, if so, decoding is done.
If more than four bytes are read, it returns an error because the MQTT spec limits Variable Byte Integers to four bytes.
1.d - Binary Data
Binary data consists of a two-byte integer indicating its length, followed by that many bytes, limiting the size to 0–65,535 bytes.
func encodeBinary(buf *bytes.Buffer, data []byte) error {
err := encodeUint16(buf, uint16(len(data)))
if err != nil {
return err
}
buf.Write(data)
return nil
}
func decodeBinary(buf *bytes.Buffer) ([]byte, error) {
l, err := decodeUint16(buf)
if err != nil {
return nil, err
}
if l == 0 {
return []byte{}, nil
}
result := make([]byte, l)
_, err = buf.Read(result)
if err != nil {
return nil, fmt.Errorf("failed to read binary data: %w", err)
}
return result, nil
}
1.e - UTF-8 Encoded String
The specifications say : “string data is prefixed with a two byte length field that gives the number of bytes in the UTF-8 encoded string itself.Consequently, the maximum size of a UTF-8 Encoded String is 65,535 bytes. Example: given the string “hello”, the encoded string will be 0x00 0x05 0x68 0x65 0x6c 0x6c 0x6f The first two bytes 0x00 0x05 represent the length of the string “hello” and the remaining bytes are the UTF-8 encoded string“
The functions for encoding and decoding UTF-8 strings are straightforward and use the 2 byte integer and binary encode/decode functions we discussed earlier.
func encodeString(buf *bytes.Buffer, str string) error {
err := encodeUint16(buf, uint16(len(str)))
if err != nil {
return err
}
buf.WriteString(str)
return nil
}
func decodeString(buf *bytes.Buffer) (string, error) {
b, err := decodeBinary(buf)
if err != nil {
return "", err
}
return string(b), nil
}
1.f - UTF-8 String pair
A UTF-8 String Pair consists of two UTF-8 encoded strings ( explained in previous section 1.e) and is used to store name-value pairs. The first string represents the name, and the second represents the value.
Now we have all these necessary functions, we can implement a connection to the broker.
2. Connecting to the broker
Once a network connection to the broker is established (typically over TCP or WebSocket), the client must send a CONNECT packet to initiate a session.
The MQTT 5 CONNECT packet contains essential information for establishing a connection between the client and broker:
Client Identifier A unique ID that identifies the client to the broker. If not provided, the broker may assign one automatically (when Clean Start is true).
Clean Start A flag that determines whether the broker should start a fresh session or resume a previous one with stored state (subscriptions, queued messages, etc.).
Keep Alive A time interval (in seconds) that specifies how often the client must communicate with the broker. If no messages are exchanged within this period, the client sends a PINGREQ packet to maintain the connection. A value of 0 disables the keep-alive mechanism.
Authentication Optional username and password fields for broker authentication, controlled by their respective flags in the Connect Flags byte.
Will Message An optional message that the broker will publish on behalf of the client if it disconnects ungracefully. This includes:
- Will Topic: Where the message will be published
- Will Payload: The message content
- Will QoS: Quality of Service level (0, 1, or 2)
- Will Retain: Whether the message should be retained by the broker
- Will Properties: MQTT 5 specific properties for the Will message
Properties
MQTT 5 introduces properties that provide additional metadata and capabilities, such as:
- Session Expiry Interval
- Request/Response Information
- Maximum Packet Size
- User Properties (custom key-value pairs)
These fields work together to establish a reliable, authenticated connection with appropriate session handling and failure recovery mechanisms.
Just like most MQTT control packets (e.g. PUBLISH, SUBSCRIBE, etc.), the CONNECT packet follows a general structure consisting of three sections:
- Fixed Header
- Variable Header
- Payload
2.a. Fixed Header
The fixed header is present in every MQTT control packet. It always starts with one byte representing the packet type and flags, followed by one or more bytes representing the remaining length.
| Field | Size | Description |
|---|---|---|
| Control Packet Type and Flags | 1 byte | Bits 7–4 identify the packet type (e.g. 1 for CONNECT, 3 for PUBLISH). Bits 3–0 are reserved for flags specific to each packet type. |
| Remaining Length | 1–4 bytes | Encoded as a Variable Byte Integer, it specifies the total number of bytes in the Variable Header and Payload. |
For example, the first byte of a CONNECT packet is always 0x10 (binary 0001 0000), where:
- Bits 7–4 (0001) = Packet type 1 → CONNECT
- Bits 3–0 (0000) = Flags (must be 0 for CONNECT)
The Remaining Length is simply the total of the variable header and payload sizes, encoded using the Variable Byte Integer function we wrote earlier.
2.b. Variable Header
As its name suggests, the variable header structure and content vary depending on the packet type.
For the CONNECT packet, the variable header has a well-defined structure that looks like this:
| Field | Type | Description |
|---|---|---|
| Protocol Name | UTF-8 String | Always "MQTT". Identifies the protocol being used. |
| Protocol Level | Byte | Indicates the protocol version. For MQTT 5.0, this value is 5. |
| Connect Flags | Byte | Bit flags defining session behavior (e.g. Clean Start), Will Message options, and authentication fields (Username/Password). |
| Keep Alive | 2 Bytes (UInt16) | Maximum time interval, in seconds, between control packets sent by the client. |
| Properties | Variable Byte Integer + Property List | MQTT 5 introduces properties that provide extensibility and optional connection parameters. |
Connect Flags Bit Layout The Connect Flags byte contains multiple bit fields, each controlling a specific connection behavior. This is an efficient way to encode 8 Boolean values in just one byte instead of using 8 separate bytes.
Here’s the breakdown of each bit (from most significant to least):
| Bit | Name | Description |
|---|---|---|
| 7 | Username Flag | Set to 1 if a Username is present in the payload. |
| 6 | Password Flag | Set to 1 if a Password is present in the payload. |
| 5 | Will Retain | Set to 1 if the Will Message should be retained. |
| 4–3 | Will QoS | Quality of Service level for the Will Message (0–2). |
| 2 | Will Flag | Set to 1 if a Will Message is included. |
| 1 | Clean Start | If 1, the client starts a new session; if 0, it resumes the previous one. |
| 0 | Reserved | Must always be 0. |
The Connect Flags are encoded using the following function
func (c *Connect) encodeConnectFlags() byte {
var flags byte
if c.cleanStart {
flags |= 0x02
}
if c.willFlag {
flags |= 0x04
flags |= c.willQoS << 3
if c.willRetain {
flags |= 0x20
}
}
if c.username != "" {
flags |= 0x80
}
if len(c.password) > 0 {
flags |= 0x40
}
return flags
}
Explanations We initialize an empty byte (00000000), then for each flag that needs to be set, we use bitwise OR operations to set the appropriate bit to 1.
For example, if willFlag is true, we need to set bit 2 (counting from 0, right to left). We OR the flags byte with 0x04 (binary 00000100) to set this bit. The |= operator performs a bitwise OR and assigns the result back to flags.
For the Will QoS level (0-2), we use a left shift operation (<< 3) to position the QoS value at bits 3-4. For instance, if willQoS is 2 (00000010), shifting it left by 3 positions gives us 00010000 (0x10).
Encoding the Variable Header
Now that we understand the structure, let’s look at how to encode the complete variable header:
func (c *Connect) encodeVariableHeader() ([]byte, error) {
var buf bytes.Buffer
if err := encodeString(&buf, c.protocolName); err != nil {
return nil, err
}
buf.WriteByte(c.protocolVersion)
buf.WriteByte(c.encodeConnectFlags())
err := encodeUint16(&buf, c.keepAlive)
if err != nil {
return nil, err
}
if c.properties == nil {
buf.WriteByte(0x00)
} else {
propBytes, err := c.properties.Encode()
if err != nil {
return nil, err
}
buf.Write(propBytes)
}
return buf.Bytes(), nil
}
Properties Encoding
MQTT 5 properties are encoded as:
- A Variable Byte Integer representing the total length of all properties
- A sequence of property ID (byte) + property value pairs
Each property has a unique identifier (e.g., 0x11 for Session Expiry Interval, 0x21 for Receive Maximum). Here’s a simplified example:
func (cp *ConnectProperties) Encode() ([]byte, error){
var propsBuf bytes.Buffer
if cp.SessionExpiryInterval != nil {
propsBuf.WriteByte(0x11)
err := encodeUint32(&propsBuf, *cp.SessionExpiryInterval)
if err != nil {
return nil, err
}
}
if cp.ReceiveMaximum != nil {
propsBuf.WriteByte(0x21)
err := encodeUint16(&propsBuf, *cp.ReceiveMaximum)
if err != nil {
return nil, err
}
}
// code removed for brevity
var finalBuf bytes.Buffer
encodeVariableByteInteger(&finalBuf, propsBuf.Len())
finalBuf.Write(propsBuf.Bytes())
return finalBuf.Bytes(), nil
}
2.c. Payload
The payload contains the actual connection data referenced by the Connect Flags:
| Field | Presence Condition | Type | Description |
|---|---|---|---|
| Client ID | Always present | UTF-8 String | Unique client identifier |
| Will Topic | If Will Flag = 1 | UTF-8 String | Topic for will message |
| Will Payload | If Will Flag = 1 | Binary Data | Will message content |
| Username | If Username Flag = 1 | UTF-8 String | Authentication username |
| Password | If Password Flag = 1 | Binary Data | Authentication password |
The payload is encoded in this exact order and this is the function I wrote to encode it :
func (c *Connect) encodePayload() ([]byte, error) {
var buf bytes.Buffer
if err := encodeString(&buf, c.clientID); err != nil {
return nil, err
}
if c.willFlag {
if c.willProperties == nil {
buf.WriteByte(0x00)
} else {
propBytes, err := c.willProperties.Encode()
if err != nil {
return nil, err
}
buf.Write(propBytes)
}
if err := encodeString(&buf, c.willTopic); err != nil {
return nil, err
}
if err := encodeBinary(&buf, []byte(c.willMessage)); err != nil {
return nil, err
}
}
if c.username != "" {
if err := encodeString(&buf, c.username); err != nil {
return nil, err
}
}
if len(c.password) > 0 {
if err := encodeBinary(&buf, c.password); err != nil {
return nil, err
}
}
return buf.Bytes(), nil
}
Putting it all together :
func (c *Connect) Encode() (net.Buffers, error) {
var buf bytes.Buffer
buf.WriteByte(TypeConnect << 4)
header, err := c.encodeVariableHeader()
if err != nil {
return nil, err
}
payload, err := c.encodePayload()
if err != nil {
return nil, err
}
remainingLength := len(payload) + len(header)
encodeVariableByteInteger(&buf, remainingLength)
return net.Buffers{buf.Bytes(), header, payload}, nil
}
After sending a well-formed CONNECT packet, the broker responds with a CONNACK packet that includes a Reason Code and an optional Session Present flag in its variable header. The CONNACK packet may also include server-capability properties, such as the maximum supported QoS, and it can override certain properties that were specified in the CONNECT packet. For example, if the broker returns the Server Keep Alive property, it replaces the client’s original Keep Alive value specified in the CONNECT packet.
2.d. Testing
For testing, I use the excellent testcontainers library, with EMQX as the MQTT broker and Redis to simulate a secured broker using username/password authentication. Here’s the code to set up a secured broker using username/password authentication backed by Redis, followed by connection tests:
func setupProtectedEMQXContainer(ctx context.Context) (*MQTTContainer, error) {
_, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
NetworkRequest: testcontainers.NetworkRequest{
Name: "emqx-test-network",
},
})
if err != nil {
return nil, fmt.Errorf("failed to create network: %w", err)
}
redisContainer, err := testcontainers.GenericContainer(
ctx,
testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "redis:7-alpine",
ExposedPorts: []string{"6379/tcp"},
WaitingFor: wait.ForListeningPort("6379/tcp"),
Networks: []string{"emqx-test-network"},
NetworkAliases: map[string][]string{
"emqx-test-network": {"redis"},
},
},
Started: true,
},
)
if err != nil {
return nil, fmt.Errorf("failed to setup Redis: %w", err)
}
time.Sleep(2 * time.Second)
container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "emqx/emqx:5.3.0",
ExposedPorts: []string{"1883/tcp", "18083/tcp"},
Networks: []string{"emqx-test-network"},
WaitingFor: wait.ForAll(
wait.ForLog("EMQX").WithStartupTimeout(10*time.Second),
wait.ForListeningPort("1883/tcp"),
wait.ForListeningPort("18083/tcp"),
),
Env: map[string]string{
"EMQX_ALLOW_ANONYMOUS": "false",
"EMQX_LOG__LEVEL": "warning",
"EMQX_AUTHENTICATION__1__MECHANISM": "password_based",
"EMQX_AUTHENTICATION__1__BACKEND": "redis",
"EMQX_AUTHENTICATION__1__REDIS_TYPE": "single",
"EMQX_AUTHENTICATION__1__SERVER": "redis:6379",
"EMQX_AUTHENTICATION__1__PASSWORD_HASH_ALGORITHM__NAME": "sha256",
"EMQX_AUTHENTICATION__1__PASSWORD_HASH_ALGORITHM__SALT_POSITION": "suffix",
"EMQX_AUTHENTICATION__1__CMD": "HMGET mqtt_user:${username} password_hash salt is_superuser",
"EMQX_AUTHENTICATION__1__DATABASE": "0",
"EMQX_AUTHENTICATION__1__AUTO_RECONNECT": "true",
},
},
Started: true,
})
if err != nil {
return nil, err
}
mqttPort, err := container.MappedPort(ctx, "1883")
if err != nil {
return nil, err
}
dashboardPort, err := container.MappedPort(ctx, "18083")
if err != nil {
return nil, err
}
hostIP, err := container.Host(ctx)
if err != nil {
return nil, err
}
mqttURI := fmt.Sprintf("%s:%s", hostIP, mqttPort.Port())
dashboardURL := fmt.Sprintf("http://%s:%s", hostIP, dashboardPort.Port())
err = createRedisUser(ctx, redisContainer)
if err != nil {
return nil, fmt.Errorf("failed to populate Redis user data: %w", err)
}
return &MQTTContainer{
Container: container,
MQTTURI: mqttURI,
DashboardURL: dashboardURL,
}, nil
}
func TestCreateConnectionWithAuth(t *testing.T) {
ctx := context.Background()
container, err := setupProtectedEMQXContainer(ctx)
if err != nil {
t.Fatalf("Failed to setup protected EMQX container: %v", err)
}
defer func() {
if err := container.Terminate(ctx); err != nil {
t.Logf("Failed to terminate container: %v", err)
}
}()
t.Logf("EMQX Dashboard available at: %s", container.DashboardURL)
t.Logf("Default login: admin/public")
t.Logf("Test user created: testuser/testpass")
tests := []struct {
name string
username string
password string
clientID string
expectError bool
expectedErr error
description string
}{
{
name: "valid_credentials",
username: "testuser",
password: "testpass",
clientID: "test-valid-connection",
expectError: false,
expectedErr: nil,
description: "Valid credentials should connect successfully",
},
{
name: "invalid_username",
username: "testuser2",
password: "testpass",
clientID: "test-invalid-username",
expectError: true,
expectedErr: ErrConnackNotAuthorized, // NOTE: EMQX returns "not authorized" for non-existent users, not "bad username", todo: open a issue
description: "Invalid username should fail with ErrConnackNotAuthorized (EMQX behavior)",
},
{
name: "invalid_password",
username: "testuser",
password: "wrongpass",
clientID: "test-invalid-password",
expectError: true,
expectedErr: ErrConnackBadUsernameOrPassword,
description: "Invalid password should fail with ErrConnackBadUsernameOrPassword",
},
{
name: "anonymous_connection",
username: "",
password: "",
clientID: "test-anonymous-conn",
expectError: true,
expectedErr: ErrConnackBadUsernameOrPassword,
description: "Anonymous connection should fail with ErrConnackBadUsernameOrPassword",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config := &ClientConfig{
Broker: container.MQTTURI,
ClientID: tt.clientID,
Username: tt.username,
Password: tt.password,
KeepAlive: 60 * time.Second,
}
client, err := NewClient(config)
if err != nil {
t.Fatalf("Failed to create client: %v", err)
}
err = client.Connect(ctx)
if tt.expectError {
if err == nil {
t.Errorf("%s: expected connection to fail but it succeeded", tt.description)
} else {
if tt.expectedErr != nil && !errors.Is(err, tt.expectedErr) {
t.Errorf("%s: expected error %v, but got %v", tt.description, tt.expectedErr, err)
} else {
t.Logf("%s: correctly failed with expected error type: %v", tt.description, err)
}
}
} else {
if err != nil {
t.Errorf("%s: expected connection to succeed but it failed: %v", tt.description, err)
} else {
t.Logf("%s: successfully connected", tt.description)
if !client.connected {
t.Errorf("Client.Connected should be true after successful connection")
}
defer client.Disconnect(ctx)
}
}
})
}
}
What’s Next?
In this first part, we’ve built the foundation of our MQTT 5.0 client by implementing the core data types and the complete CONNECT packet encoding and decoding. We can now establish authenticated connections to any MQTT 5.0 broker.
In Part II, we’ll cover:
- Publishing messages with different QoS levels
- Implementing proper session management
The complete implementation is available on GitHub. Feel free to explore the code, open issues, or contribute