Skip to main content
Version: 2.0-dev

InfluxDB InfluxDB Integration Guide

Connect to InfluxDB to read and write time-series data in your pipelines. This guide covers connection setup, function configuration, and pipeline integration for InfluxDB 2.x deployments.

Overview

The InfluxDB connector enables integration with InfluxDB time-series databases, commonly used for storing metrics, telemetry, IoT data, and industrial signals. It provides:

  • Flux query support for flexible time-series data retrieval with filtering and aggregation
  • Line Protocol writes for efficient data ingestion with measurements, tags, and typed fields
  • Secure token authentication with TLS/HTTPS support and custom CA certificates
  • Batching and retry controls for reliable high-throughput data operations
  • Template parameters for dynamic queries and writes based on runtime input
InfluxDB Version Support

This connector is optimized for InfluxDB 2.x with its bucket/organization model and Flux query language. Support for InfluxDB 1.x and 3.x is planned for future releases.

Connection Configuration

Creating an InfluxDB Connection

Navigate to ConnectionsNew ConnectionInfluxDB and configure the following:

InfluxDB Connection Creation Fields

1. Profile Information
FieldDefaultDescription
Profile Name-A descriptive name for this connection profile (required, max 100 characters)
Description-Optional description for this InfluxDB connection
2. Connection Settings
FieldDefaultDescription
Server URL-InfluxDB server URL (e.g., http://localhost:8086 or https://us-east-1-1.aws.cloud2.influxdata.com) – required
Organization-InfluxDB organization name – required
HTTP Request Timeout (sec)20Timeout for HTTP requests to InfluxDB (1-300 seconds)
3. Authentication
FieldDefaultDescription
Auth Token-API token for authentication – required. Masked on edit; leave empty to keep stored value
4. TLS/Security Settings
FieldDefaultDescription
CA Certificate-Custom CA certificate in PEM format for server validation
Skip VerifyfalseSkip server certificate verification (use only for development/testing)
Security Notice

Enabling Skip Verify disables TLS certificate validation. Use only in trusted development environments, never in production.

5. Batching Settings
FieldDefaultDescription
Use GZipfalseEnable GZip compression for write requests to reduce bandwidth
Batch Size5000Maximum number of points per write batch (1-100,000)
Flush Interval (ms)1000Maximum time before flushing buffered points (100-60,000 ms)
PrecisionnsTimestamp precision for writes: ns (nanoseconds), us (microseconds), ms (milliseconds), or s (seconds)
6. Retry Configuration
FieldDefaultDescription
Max Retries5Maximum retry attempts for failed requests (0-20, 0 = disable retries)
Retry Interval (ms)5000Initial delay between retry attempts (100-60,000 ms)
Max Retry Interval (ms)125000Maximum delay cap for exponential backoff (1,000-300,000 ms)
Max Retry Time (ms)180000Total time limit for all retry attempts (1,000-600,000 ms)
Retry Buffer Limit50000Maximum points to buffer for retry (1,000-1,000,000)
7. Advanced Settings
FieldDefaultDescription
Default Tags-Key-value pairs automatically added to all written data points (max 10 tags)

Example Default Tags

  • environment: production
  • source: maestrohub
  • site: factory-1
8. Connection Labels
FieldDefaultDescription
Labels-Key-value pairs to categorize and organize this InfluxDB connection (max 10 labels)

Example Labels

  • env: prod – Environment
  • team: data-platform – Responsible team
  • region: us-east-1 – Deployment region
Notes
  • Required Fields: Server URL, Organization, and Auth Token must be provided.
  • Authentication: InfluxDB 2.x uses token-based authentication. Generate tokens in the InfluxDB UI under Load DataAPI Tokens.
  • Batching: The connector buffers write points and flushes them based on Batch Size or Flush Interval, whichever is reached first. This improves throughput for high-frequency writes.
  • Retry Logic: Implements exponential backoff for transient failures. The delay doubles after each retry up to Max Retry Interval.
  • Default Tags: Useful for adding consistent metadata (e.g., source system, environment) to all data points without modifying individual writes.

Function Builder

Creating InfluxDB Functions

Once you have a connection established, you can create reusable query and write functions:

  1. Navigate to FunctionsNew Function
  2. Select InfluxDB Query or InfluxDB Write as the function type
  3. Choose your InfluxDB connection
  4. Configure the function parameters
InfluxDB Function Creation

Design reusable InfluxDB Query and Write functions with Flux queries, field definitions, and parameter configuration

Query Function

Purpose: Execute Flux queries against InfluxDB to retrieve time-series data. Use this for reading metrics, aggregating data, and filtering by time range or tags.

InfluxDB Query Configuration

Query function configuration with Flux query editor and timeout settings

Configuration Fields

FieldTypeRequiredDefaultDescription
Flux QueryStringYes-Flux query to execute. Supports template parameters using ((paramName)) syntax.
Timeout (seconds)NumberNo30Query execution timeout (1-300 seconds)

Example Flux Queries

// Retrieve temperature readings from the last hour
from(bucket: "sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r.location == "factory-1")
// Calculate hourly averages for the past 24 hours
from(bucket: "metrics")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "cpu_usage")
|> aggregateWindow(every: 1h, fn: mean)
|> yield(name: "hourly_avg")
// Query with template parameters
from(bucket: "((bucket))")
|> range(start: ((startTime)), stop: ((endTime)))
|> filter(fn: (r) => r._measurement == "((measurement))")
|> filter(fn: (r) => r.asset_id == "((assetId))")

Response Format

{
"success": true,
"records": [
{"_time": "2024-01-15T10:00:00Z", "_value": 23.5, "location": "factory-1"},
{"_time": "2024-01-15T10:01:00Z", "_value": 23.7, "location": "factory-1"}
],
"recordCount": 2,
"organization": "my-org"
}

Use Cases:

  • Retrieve sensor readings for dashboards and analytics
  • Calculate aggregates (mean, max, min) over time windows
  • Filter and export historical data for reporting
  • Monitor equipment metrics with tag-based filtering

Write Function

Purpose: Write data points to InfluxDB using Line Protocol format. Supports explicit field type selection to ensure correct data types are stored.

InfluxDB Write Configuration

Write function configuration with bucket, measurement, tags, fields, and timestamp settings

Configuration Fields

FieldTypeRequiredDefaultDescription
BucketStringYes-Target bucket for writing data. Supports template parameters.
MeasurementStringYes-Measurement name for the data point (similar to a table name). Supports template parameters.
TagsObjectNo{}Key-value pairs for indexed dimensions used in filtering and grouping. Tag values are always strings.
FieldsArrayYes-Array of field definitions with name, value, and type. At least one field is required.
TimestampString/NumberNocurrent timeUnix timestamp in nanoseconds. Supports template parameters. If empty, uses current time at execution.

Field Definition Structure

Each field in the Fields array must contain:

PropertyTypeRequiredDescription
nameStringYesField name. Supports template parameters.
valueStringYesField value. Supports template parameters (e.g., ((temperature))).
typeEnumYesData type for the field value.

Supported Field Types

TypeDescriptionExample Values
float64-bit floating point number25.5, 3.14159, -10.0
integer64-bit signed integer100, -50, 0
uinteger64-bit unsigned integer100, 0 (no negative values)
booleanBoolean valuetrue, false
stringUTF-8 string"active", "sensor-01"

Example Configuration

{
"bucket": "sensors",
"measurement": "temperature",
"tags": {
"location": "factory-1",
"sensor_id": "temp-001",
"line": "assembly-a"
},
"fields": [
{ "name": "value", "value": "((temperature))", "type": "float" },
{ "name": "humidity", "value": "((humidity))", "type": "float" },
{ "name": "status", "value": "active", "type": "string" }
],
"timestamp": "((timestamp))"
}

Response Format

{
"success": true,
"pointsWritten": 1,
"bucket": "sensors",
"measurement": "temperature",
"organization": "my-org"
}

Use Cases:

  • Ingest real-time sensor data from industrial equipment
  • Store production metrics and KPIs
  • Log machine states and events with metadata tags
  • Archive telemetry data from IoT devices

Using Parameters

The ((parameterName)) syntax creates dynamic, reusable functions. Parameters are automatically detected and can be configured with:

ConfigurationDescriptionExample
TypeData type validationstring, number, boolean, datetime, json, buffer
RequiredMake parameters mandatory or optionalRequired / Optional
Default ValueFallback value if not providedsensors, 0, NOW()
DescriptionHelp text for users"Target bucket name", "Sensor temperature reading"

Pipeline Integration

Use the InfluxDB query and write functions you create here as nodes inside the Pipeline Designer. Drag the function node onto the canvas, bind its parameters to upstream outputs or constants, and configure error handling as needed.

Common patterns include:

  • Read → Transform → Write: Query data from InfluxDB, process it, and write results back
  • Collect → Store: Gather data from OPC UA, MQTT, or Modbus and store in InfluxDB
  • Query → Alert: Monitor metrics and trigger notifications based on thresholds

For broader orchestration patterns that combine InfluxDB with SQL, REST, MQTT, or other connector steps, see the Connector Nodes page.

InfluxDB node in pipeline designer

InfluxDB function node with connection, function, and parameter bindings

Common Use Cases

Storing Industrial Sensor Data

Scenario: Collect temperature, pressure, and vibration readings from manufacturing equipment and store them in InfluxDB for monitoring and analysis.

Write Configuration:

{
"bucket": "factory-telemetry",
"measurement": "equipment_sensors",
"tags": {
"plant": "chicago",
"line": "assembly-1",
"machine_id": "((machineId))"
},
"fields": [
{ "name": "temperature", "value": "((temp))", "type": "float" },
{ "name": "pressure", "value": "((pressure))", "type": "float" },
{ "name": "vibration", "value": "((vibration))", "type": "float" },
{ "name": "running", "value": "((isRunning))", "type": "boolean" }
]
}

Pipeline Integration: Connect after OPC UA or Modbus read nodes to continuously store equipment telemetry.


Querying for Dashboard Visualization

Scenario: Retrieve aggregated metrics for the past 24 hours to display on a real-time dashboard.

Query Configuration:

from(bucket: "factory-telemetry")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "equipment_sensors")
|> filter(fn: (r) => r.plant == "((plant))")
|> aggregateWindow(every: 15m, fn: mean)
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")

Pipeline Integration: Use in scheduled pipelines that feed visualization dashboards or BI tools.


Monitoring and Alerting

Scenario: Query recent sensor readings to detect anomalies and trigger alerts when values exceed thresholds.

Query Configuration:

from(bucket: "factory-telemetry")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "equipment_sensors")
|> filter(fn: (r) => r._field == "temperature")
|> filter(fn: (r) => r._value > 85.0)
|> count()

Pipeline Integration: Connect to conditional logic nodes that send alerts via SMTP or MS Teams when anomalies are detected.


Data Migration and Backup

Scenario: Export historical data from InfluxDB for archival or migration to another system.

Query Configuration:

from(bucket: "((sourceBucket))")
|> range(start: ((startDate)), stop: ((endDate)))
|> filter(fn: (r) => r._measurement == "((measurement))")

Pipeline Integration: Combine with S3 Write or SQL Insert functions to archive data to cold storage or migrate between systems.