
MQTT trigger node
MQTT Trigger Node
Overview
The MQTT Trigger Node automatically initiates MaestroHub pipelines when messages arrive on subscribed MQTT topics. Unlike the MQTT Subscribe connector node which receives messages within an already-running pipeline, the MQTT Trigger starts new pipeline executions in response to incoming messages—enabling fully event-driven automation.
Core Functionality
What It Does
MQTT Trigger enables real-time, event-driven pipeline execution by:
1. Event-Driven Pipeline Execution
Start pipelines automatically when MQTT messages arrive, without manual intervention or polling. Perfect for IoT sensor ingestion, command processing, and real-time data flows.
2. Automatic Subscription Management
Subscriptions are created and cleaned up automatically—no manual subscription management required.
3. Message Payload Passthrough
Incoming MQTT message payloads are passed directly to the pipeline, making them available to all downstream nodes via the $input variable.
Reconnection Handling
MaestroHub automatically handles connection disruptions to ensure reliable message delivery.
Automatic Recovery
When an MQTT connection is lost and restored:
- Connection Lost: The system detects the disconnection automatically
- Connection Restored: Subscriptions are automatically re-established within seconds
- Transparent Recovery: Pipelines continue to receive messages once the connection is restored—no manual intervention required
What This Means for Your Workflows
| Scenario | Behavior |
|---|---|
| Brief network interruption | Automatic resubscription after reconnection |
| Broker restart | Subscriptions automatically restored when broker comes back online |
| MaestroHub restart | All triggers for enabled pipelines are restored on startup |
To minimize message loss during brief outages, configure your MQTT Subscribe function with QoS 1 or QoS 2 and ensure your broker supports message persistence.
Configuration Options
Basic Information
| Field | Type | Description |
|---|---|---|
| Node Label | String (Required) | Display name for the node on the pipeline canvas |
| Description | String (Optional) | Explains what this trigger initiates |
Parameters
| Parameter | Type | Default | Required | Constraints | Description |
|---|---|---|---|---|---|
| Connection ID | string | "" | Yes | -- | MQTT connection profile to use. |
| Function ID | string | "" | Yes | -- | Subscribe function within the connection. |
| Trigger Mode | select | "always" | No | always / onChange | always: Trigger on every message. onChange: Only trigger when payload differs. |
| Enabled | boolean | true | No | -- | Enable/disable the trigger. |
| Dedup Max Keys | number | 1000 | If onChange | 1--10,000 | Maximum tracked topic hashes for change detection. |
| Dedup TTL | select | -- | If onChange | 1h / 6h / 12h / 24h / 72h / 168h | Dedup state TTL. Enterprise only. |
The selected function must be an MQTT Subscribe function type. Publish functions cannot be used with MQTT Trigger nodes.
Settings
Description
A free-text area for documenting the node's purpose and behavior. Notes entered here are saved with the pipeline and visible to all team members.
Execution Settings
| Setting | Options | Default | Description |
|---|---|---|---|
| Timeout (seconds) | number | Pipeline default | Maximum execution time for this node (1--600). Leave empty for pipeline default. |
| Retry on Timeout | Pipeline Default / Enabled / Disabled | Pipeline Default | Whether to retry the node if it times out. |
| Retry on Fail | Pipeline Default / Enabled / Disabled | Pipeline Default | Whether to retry on failure. When Enabled, shows Advanced Retry Configuration. |
| On Error | Pipeline Default / Stop Pipeline / Continue Execution | Pipeline Default | Behavior when node fails after all retries. |
Advanced Retry Configuration (visible when Retry on Fail = Enabled)
| Field | Type | Default | Range | Description |
|---|---|---|---|---|
| Max Attempts | number | 3 | 1--10 | Maximum retry attempts. |
| Initial Delay (ms) | number | 1000 | 100--30,000 | Wait before first retry. |
| Max Delay (ms) | number | 120000 | 1,000--300,000 | Upper bound for backoff delay. |
| Multiplier | number | 2.0 | 1.0--5.0 | Exponential backoff multiplier. |
| Jitter Factor | number | 0.1 | 0--0.5 | Random jitter (+-percentage). |
Output Data Structure
When an MQTT message triggers pipeline execution, the following data is available to downstream nodes via the $input variable.
Output Format
{
"type": "mqtt_trigger",
"connectionId": "bf29be94-fc0a-4dc4-8e5c-092f1b74eb4b",
"functionId": "aef374c3-aa2b-454e-aabc-5657faac5950",
"enabled": true,
"node_id": "trigger.mqtt-1",
"message": "MQTT trigger provides configuration for event-based triggering",
"payload": {
"temperature": 23.5,
"unit": "celsius"
}
}
Accessing Message Data
In downstream nodes, use the $input variable to access the trigger output:
| Field | Expression | Description |
|---|---|---|
| Message Payload | $input.payload | The parsed MQTT message content (JSON object or raw value) |
| Connection ID | $input.connectionId | The MQTT connection profile used |
| Function ID | $input.functionId | The MQTT Subscribe function that received the message |
| Node ID | $input.node_id | The trigger node identifier |
| Trigger Type | $input.type | Always mqtt_trigger |
If your MQTT messages contain JSON, access nested fields directly:
$input.payload.temperature— Access a specific field$input.payload— Access the entire message object
Validation Rules
The MQTT Trigger Node enforces these validation requirements:
Parameter Validation
Connection ID
- Must be provided and non-empty
- Must reference a valid MQTT connection profile
- Error: "connectionId is required"
Function ID
- Must be provided and non-empty
- Must reference a valid MQTT Subscribe function
- Function must belong to the specified connection
- Error: "functionId is required"
Enabled Flag
- Must be a boolean if provided
- Error: "enabled must be a boolean"
Settings Validation
On Error
- Must be one of:
stopPipeline,continueExecution,retryNode - Error: "onError must be a valid error handling option"
Usage Examples
IoT Sensor Data Ingestion
Scenario: Process temperature readings from factory sensors in real-time.
Configuration:
- Label: Factory Temperature Monitor
- Connection: Production MQTT Broker
- Function: Subscribe to
sensors/temperature/#(all temperature topics) - Enabled: true
Downstream Processing:
- Parse JSON payload to extract temperature value
- Check if temperature exceeds threshold
- Send alert if threshold exceeded
- Store reading in time-series database
Command Processing
Scenario: Execute machine commands received from a central control system.
Configuration:
- Label: Machine Command Handler
- Connection: Control System MQTT
- Function: Subscribe to
commands/machine-01/+(all command types for machine-01) - Enabled: true
Downstream Processing:
- Validate command structure
- Route to appropriate PLC write node based on command type
- Log command execution result
- Publish acknowledgment to response topic
Multi-Plant Event Aggregation
Scenario: Aggregate events from multiple manufacturing plants for centralized monitoring.
Configuration:
- Label: Plant Event Aggregator
- Connection: Enterprise MQTT Broker
- Function: Subscribe to
plants/+/events/#(all events from all plants) - Enabled: true
Downstream Processing:
- Extract plant ID from topic
- Normalize event format
- Route critical events to alerting system
- Store all events in central data lake
Best Practices
Connection Health Monitoring
- Configure your MQTT connection with Keep Alive intervals appropriate for your network (default: 60 seconds)
- Enable Auto Reconnect on the connection profile for automatic recovery
- Monitor connection status through MaestroHub's connection health dashboard
Designing for Reliability
| Practice | Rationale |
|---|---|
| Use QoS 1 or 2 for critical messages | Ensures delivery acknowledgment and reduces message loss |
| Enable Clean Session = false (MQTT v3.1.1) or Session Expiry > 0 (MQTT v5.0) | Allows broker to queue messages during brief disconnections |
| Keep topic filters specific | Reduces unnecessary message processing and improves performance |
| Use wildcard topics judiciously | Broad wildcards (#) can result in high message volumes |
Error Handling Strategies
For Critical Workflows:
- Set On Error to
stopPipeline - Monitor pipeline execution failures
- Implement alerting for stopped pipelines
For Best-Effort Processing:
- Set On Error to
continueExecution - Log failed messages for later analysis
- Ensure downstream nodes handle partial failures gracefully
Performance Considerations
| Scenario | Recommendation |
|---|---|
| High message volume | Use specific topic filters; consider message batching in downstream nodes |
| Large payloads | Ensure adequate memory for payload processing; consider streaming for very large messages |
| Multiple triggers on same topic | MaestroHub optimizes this with shared subscriptions—no action needed |
| Burst traffic | Monitor pipeline execution queue; consider rate limiting in upstream systems |
Enable vs. Disable
- Use the trigger's Enabled parameter to temporarily pause message processing without changing pipeline state
- Disable triggers during maintenance windows to prevent message processing
- Document the reason for disabled triggers in the node's Notes field