Skip to main content
Version: 2.0-beta.1
MQTT Trigger Node interface

MQTT trigger node

MQTT Trigger Node

Overview

The MQTT Trigger Node automatically initiates MaestroHub pipelines when messages arrive on subscribed MQTT topics. Unlike the MQTT Subscribe connector node which receives messages within an already-running pipeline, the MQTT Trigger starts new pipeline executions in response to incoming messages—enabling fully event-driven automation.


Core Functionality

What It Does

MQTT Trigger enables real-time, event-driven pipeline execution by:

1. Event-Driven Pipeline Execution
Start pipelines automatically when MQTT messages arrive, without manual intervention or polling. Perfect for IoT sensor ingestion, command processing, and real-time data flows.

2. Automatic Subscription Management
Subscriptions are created and cleaned up automatically—no manual subscription management required.

3. Message Payload Passthrough
Incoming MQTT message payloads are passed directly to the pipeline, making them available to all downstream nodes via the $input variable.


Reconnection Handling

MaestroHub automatically handles connection disruptions to ensure reliable message delivery.

Automatic Recovery

When an MQTT connection is lost and restored:

  1. Connection Lost: The system detects the disconnection automatically
  2. Connection Restored: Subscriptions are automatically re-established within seconds
  3. Transparent Recovery: Pipelines continue to receive messages once the connection is restored—no manual intervention required

What This Means for Your Workflows

ScenarioBehavior
Brief network interruptionAutomatic resubscription after reconnection
Broker restartSubscriptions automatically restored when broker comes back online
MaestroHub restartAll triggers for enabled pipelines are restored on startup
QoS and Message Retention

To minimize message loss during brief outages, configure your MQTT Subscribe function with QoS 1 or QoS 2 and ensure your broker supports message persistence.


Configuration Options

MQTT Trigger Parameters

Parameters Tab

Parameters Configuration

MQTT Trigger Settings

Settings Tab

Settings Configuration

Basic Information

FieldTypeDescription
Node LabelString (Required)Display name for the node on the pipeline canvas
DescriptionString (Optional)Explains what this trigger initiates

Parameters

ParameterTypeRequiredDefaultDescription
ConnectionConnection IDYes-The MQTT connection profile to use for subscribing
FunctionFunction IDYes-The MQTT Subscribe function that defines topic filters and subscription options
EnabledBooleanNotrueWhen disabled, the subscription is not created even if the pipeline is enabled
Function Requirement

The selected function must be an MQTT Subscribe function type. Publish functions cannot be used with MQTT Trigger nodes.


Settings

Basic Settings

SettingOptionsDefaultRecommendation
Retry on Failtrue / falsefalseKeep disabled for triggers—retries don't apply to event-driven nodes
Error HandlingStop Pipeline / ContinueStop PipelineUse "Stop Pipeline" to halt on critical failures

Documentation Settings

SettingTypeDefaultPurpose
NotesTextEmptyDocument the trigger's purpose, expected message formats, or operational context
Display Note in PipelineBooleanfalseShow notes on the pipeline canvas for quick reference

Output Data Structure

When an MQTT message triggers pipeline execution, the following data is available to downstream nodes via the $input variable.

Output Format

{
"type": "mqtt_trigger",
"connectionId": "bf29be94-fc0a-4dc4-8e5c-092f1b74eb4b",
"functionId": "aef374c3-aa2b-454e-aabc-5657faac5950",
"enabled": true,
"node_id": "trigger.mqtt-1",
"message": "MQTT trigger provides configuration for event-based triggering",
"payload": {
"temperature": 23.5,
"unit": "celsius"
}
}

Accessing Message Data

In downstream nodes, use the $input variable to access the trigger output:

FieldExpressionDescription
Message Payload$input.payloadThe parsed MQTT message content (JSON object or raw value)
Connection ID$input.connectionIdThe MQTT connection profile used
Function ID$input.functionIdThe MQTT Subscribe function that received the message
Node ID$input.node_idThe trigger node identifier
Trigger Type$input.typeAlways mqtt_trigger
Accessing Nested Payload Data

If your MQTT messages contain JSON, access nested fields directly:

  • $input.payload.temperature — Access a specific field
  • $input.payload — Access the entire message object

Validation Rules

The MQTT Trigger Node enforces these validation requirements:

Parameter Validation

Connection ID

  • Must be provided and non-empty
  • Must reference a valid MQTT connection profile
  • Error: "connectionId is required"

Function ID

  • Must be provided and non-empty
  • Must reference a valid MQTT Subscribe function
  • Function must belong to the specified connection
  • Error: "functionId is required"

Enabled Flag

  • Must be a boolean if provided
  • Error: "enabled must be a boolean"

Settings Validation

On Error

  • Must be one of: stopPipeline, continueExecution, retryNode
  • Error: "onError must be a valid error handling option"

Usage Examples

IoT Sensor Data Ingestion

Scenario: Process temperature readings from factory sensors in real-time.

Configuration:

  • Label: Factory Temperature Monitor
  • Connection: Production MQTT Broker
  • Function: Subscribe to sensors/temperature/# (all temperature topics)
  • Enabled: true

Downstream Processing:

  • Parse JSON payload to extract temperature value
  • Check if temperature exceeds threshold
  • Send alert if threshold exceeded
  • Store reading in time-series database

Command Processing

Scenario: Execute machine commands received from a central control system.

Configuration:

  • Label: Machine Command Handler
  • Connection: Control System MQTT
  • Function: Subscribe to commands/machine-01/+ (all command types for machine-01)
  • Enabled: true

Downstream Processing:

  • Validate command structure
  • Route to appropriate PLC write node based on command type
  • Log command execution result
  • Publish acknowledgment to response topic

Multi-Plant Event Aggregation

Scenario: Aggregate events from multiple manufacturing plants for centralized monitoring.

Configuration:

  • Label: Plant Event Aggregator
  • Connection: Enterprise MQTT Broker
  • Function: Subscribe to plants/+/events/# (all events from all plants)
  • Enabled: true

Downstream Processing:

  • Extract plant ID from topic
  • Normalize event format
  • Route critical events to alerting system
  • Store all events in central data lake

Best Practices

Connection Health Monitoring

  • Configure your MQTT connection with Keep Alive intervals appropriate for your network (default: 60 seconds)
  • Enable Auto Reconnect on the connection profile for automatic recovery
  • Monitor connection status through MaestroHub's connection health dashboard

Designing for Reliability

PracticeRationale
Use QoS 1 or 2 for critical messagesEnsures delivery acknowledgment and reduces message loss
Enable Clean Session = false (MQTT v3.1.1) or Session Expiry > 0 (MQTT v5.0)Allows broker to queue messages during brief disconnections
Keep topic filters specificReduces unnecessary message processing and improves performance
Use wildcard topics judiciouslyBroad wildcards (#) can result in high message volumes

Error Handling Strategies

For Critical Workflows:

  • Set On Error to stopPipeline
  • Monitor pipeline execution failures
  • Implement alerting for stopped pipelines

For Best-Effort Processing:

  • Set On Error to continueExecution
  • Log failed messages for later analysis
  • Ensure downstream nodes handle partial failures gracefully

Performance Considerations

ScenarioRecommendation
High message volumeUse specific topic filters; consider message batching in downstream nodes
Large payloadsEnsure adequate memory for payload processing; consider streaming for very large messages
Multiple triggers on same topicMaestroHub optimizes this with shared subscriptions—no action needed
Burst trafficMonitor pipeline execution queue; consider rate limiting in upstream systems

Enable vs. Disable

  • Use the trigger's Enabled parameter to temporarily pause message processing without changing pipeline state
  • Disable triggers during maintenance windows to prevent message processing
  • Document the reason for disabled triggers in the node's Notes field