How to Integrate DolphinDB with MQTT for Industrial IoT Real-Time Data Ingestion

DolphinDB can integrate with MQTT through its plugin to handle device message subscription, stream parsing, real-time computation, and distributed persistence. This approach addresses common Industrial IoT challenges such as protocol integration complexity, high-frequency time-series writes, and real-time alerting.

Technical Specifications at a Glance

Parameter Description
Core Languages DolphinDB scripts, Bash
Messaging Protocol MQTT 3.x/5.x compatible scenarios
Typical Brokers EMQX, Mosquitto
Data Model Stream tables, distributed partitioned tables, time-series engine
Core Dependencies DolphinDB MQTT Plugin, JSON parsing support
Applicable Scenarios Device telemetry, alert ingestion, real-time monitoring
GitHub Stars Not provided in the original source

DolphinDB and MQTT Work Well for Industrial Real-Time Ingestion

MQTT is a publish-subscribe protocol designed for low-bandwidth and unstable network environments, which makes it a natural fit for sensors, gateways, and edge devices. DolphinDB excels at high-throughput ingestion, stream processing, and time-series analytics. Together, they form a complete pipeline from message ingestion to parsing, computation, and storage.

In industrial environments, the real challenge is not a single connection. It is sustained ingestion, Topic design, QoS selection, and exception handling. DolphinDB’s plugin-based integration model reduces much of this engineering complexity.

The Five Most Important MQTT Concepts

Concept Purpose Example
Broker Central message routing service EMQX, Mosquitto
Topic Routing subject device/123/data
Publisher Message producer Sensor, PLC, gateway
Subscriber Message consumer DolphinDB, application service
QoS Delivery guarantee level 0 / 1 / 2

QoS 0 works well for high-frequency telemetry where limited data loss is acceptable. QoS 1 is better for critical business data but may introduce duplicates. QoS 2 provides the strongest delivery guarantee, but it also has the highest cost. In practice, you need to balance throughput against reliability.

Plugin Installation and Loading Are the First Steps in the Ingestion Pipeline

The original material provides a standard Linux build workflow. The main goal is to generate libPluginMQTT.so and then load it dynamically in DolphinDB.

# Download and build the MQTT plugin
cd /opt/dolphindb/server/plugins
git clone https://github.com/dolphindb/DolphinDBPlugin.git
cd DolphinDBPlugin/mqtt
mkdir build && cd build
cmake ..
make  # Build the shared library

These commands download and compile the DolphinDB MQTT plugin shared library.

// Load the MQTT plugin
loadPlugin("/opt/dolphindb/server/plugins/mqtt/libPluginMQTT.so")

// List plugin functions to verify successful loading
getFunctionList()

This script loads the plugin in DolphinDB and verifies that the functions are available.

The Broker Connection Model Determines Security and Stability

A minimal connection only requires the Broker address and a client ID. In production, you should usually add username/password authentication or enable SSL/TLS directly.

// Establish a basic connection
loadPlugin("/opt/dolphindb/server/plugins/mqtt/libPluginMQTT.so")
conn = mqtt::connect("tcp://localhost:1883", "dolphindb_client")  // Connect to the local Broker
print("MQTT connection established successfully")

This code creates the minimum viable connection to an MQTT Broker.

Authentication and Encryption Should Be the Default

// Connect with username and password authentication
conn = mqtt::connect(
    "tcp://broker.example.com:1883",
    "dolphindb_client",
    "username",
    "password"
)

// Establish a TLS-secured connection
secureConn = mqtt::connect(
    "ssl://broker.example.com:8883",
    "dolphindb_secure",
    "username",
    "password",
    true  // Enable SSL
)

This example shows the standard patterns for authenticated and TLS-encrypted connections.

Subscription and Parsing Logic Determine Whether Data Is Immediately Usable

Subscription is not just about receiving messages. It is about mapping messages reliably into a stream table structure. A practical approach is to define the stream table first, then bind the Topic and parsing function.

// Create a stream table for sensor data ingestion
share streamTable(1:0, `device_id`timestamp`temperature`humidity,
    [INT, TIMESTAMP, DOUBLE, DOUBLE]) as sensor_stream

// Subscribe to the Topic and write directly into the stream table
mqtt::subscribe(conn, "sensor/data", sensor_stream)

// Check the current number of received records
select count(*) from sensor_stream

This code implements a direct mapping from an MQTT Topic into a DolphinDB stream table.

Wildcard Subscriptions Can Cover Large Device Fleets

+ represents a single-level wildcard, and # represents a multi-level wildcard. They are especially useful for hierarchical Topics such as workshops, production lines, and device groups, for example iot/device/+/data or sensor/#.

// Subscribe to multiple Topic patterns with wildcards
mqtt::subscribe(conn, "sensor/+/temperature", sensor_stream)  // Single-level match
mqtt::subscribe(conn, "sensor/#", sensor_stream)              // Multi-level match
mqtt::subscribe(conn, "workshop/A/+/data", sensor_stream)     // Workshop-level subscription

This code demonstrates how to subscribe in bulk under a hierarchical MQTT Topic design.

JSON Parsing Functions Should Also Enforce Schema Validation

// Parse JSON messages into a structured table
 def parseSensorJson(msg) {
    data = parseJson(msg)
    return table(
        data.device_id as device_id,
        data.timestamp as timestamp,
        data.temperature as temperature,
        data.humidity as humidity
    )
}

mqtt::subscribe(conn, "sensor/data", sensor_stream, parseSensorJson)

This code parses JSON messages into structured records that can be inserted directly into a stream table.

Real-Time Processing and Persistence Can Form a Closed Loop on One Platform

Industrial IoT is not only about data collection. It also depends on aggregation, anomaly detection, and historical traceability. DolphinDB’s strength lies in its native combination of stream processing and distributed storage.

// Create a real-time stream table for device telemetry
share streamTable(1:0, `device_id`timestamp`temperature`humidity`pressure`vibration`power`status,
    [INT, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, SYMBOL]) as realtime_stream

// Subscribe to device data
mqtt::subscribe(conn, "iot/device/+/data", realtime_stream, parseDeviceData, 1)

This code sets up a real-time ingestion entry point for industrial device telemetry.

The Time-Series Engine Can Aggregate Data at the Minute Level

// Create a one-minute window aggregation engine
avg_engine = createTimeSeriesEngine(
    "avg_engine",
    60000,
    60000,
    <[avg(temperature) as avg_temp, max(temperature) as max_temp, count(*) as sample_count]>,
    realtime_stream,
    `device_id,
    `timestamp
)

subscribeTable(, "realtime_stream", "avg_engine", -1, append!{avg_engine}, true)

This code performs minute-level statistical aggregation on the live device stream.

// Persist data into a distributed partitioned table
schema = table(1:0, `device_id`timestamp`temperature`humidity`pressure`vibration`power`status,
    [INT, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, SYMBOL])

db = database("dfs://iot_data", COMPO, [RANGE, 2024.01.01..2024.12.31, VALUE, 1..100])
db.createPartitionedTable(schema, `sensor_data, `timestamp`device_id)

This code creates a distributed persistence structure for time-series device data.

Broker Configuration and Operations Strategy Directly Affect Throughput Limits

The original material recommends EMQX and Mosquitto. EMQX is more enterprise-oriented, while Mosquitto is more lightweight. In test environments, you can bring up either Broker quickly with Docker.

# Start EMQX
docker run -d --name emqx \
  -p 1883:1883 -p 8883:8883 -p 18083:18083 \
  emqx/emqx:latest

# Start Mosquitto
docker run -d --name mosquitto \
  -p 1883:1883 -p 9001:9001 \
  eclipse-mosquitto

These commands provision a test-ready MQTT Broker environment.

You should pay close attention to four parameters: maximum connections, message size, keepalive interval, and session expiry. Under high concurrency, you should also reduce backpressure through batched subscriptions, queue buffering, and multithreaded consumption.

Connection Monitoring and Automatic Reconnection Are Mandatory in Production

// Connection function with retry logic
 def connectWithRetry(brokerUrl, clientId, maxRetries=5) {
    retryCount = 0
    conn = NULL
    while (retryCount < maxRetries) {
        try {
            conn = mqtt::connect(brokerUrl, clientId)  // Attempt to connect
            return conn
        } catch(ex) {
            retryCount += 1
            sleep(5000)  // Wait 5 seconds before retrying
        }
    }
    throw "MQTT connection failed"
}

This code implements automatic reconnection when the Broker is unstable.

Best Practices Should Standardize Topic Design, Message Format, and Reliability

Your Topics should reflect hierarchy such as tenant, workshop, device, and data type, instead of using flat names without semantics. Recommended patterns include device/{device_id}/data and alert/{level}/{type}. This design improves access isolation, wildcard subscriptions, and future routing extensibility.

For message format, JSON is a practical default. Include four layers of information: device ID, timestamp, business payload, and metadata. This design simplifies parsing and makes it easier to extend the schema later with fields such as battery level, firmware version, and network status.

{
  "device_id": 1,
  "timestamp": "2024-01-15T10:30:00.123Z",
  "data": {
    "temperature": 25.5,
    "humidity": 50.0,
    "pressure": 1013.25
  },
  "metadata": {
    "firmware": "v1.2.3",
    "battery": 85
  }
}

This JSON example shows a standard message structure suitable for industrial device telemetry.

FAQ

Q: Which QoS level should I prioritize when integrating DolphinDB with MQTT?

A: Use QoS 0 first for high-frequency sensor data when throughput matters most. Use QoS 1 for critical status updates or alerts. Only consider QoS 2 in strongly consistent scenarios, because it has the highest overhead.

Q: How should I design Topics for future extensibility?

A: Use a layered pattern such as business-domain/location/device/data-type, for example iot/workshopA/device123/data. This structure makes wildcard subscriptions, access control, and cross-system routing much easier.

Q: How can I prevent ingestion from becoming a persistence bottleneck?

A: Use stream tables to decouple ingestion from storage, enable batch processing and persistent queues, and design partitioned tables by time and device dimensions. This can significantly improve throughput and stability.

Key Takeaways

This guide reconstructs a DolphinDB MQTT ingestion architecture covering plugin installation, Broker connection, Topic subscription, JSON parsing, stream table processing, persistent storage, and operational optimization. It helps developers quickly build a highly reliable real-time data ingestion pipeline for Industrial IoT.