Skip to main content
Version: 2.1

PostgreSQL PostgreSQL Integration Guide

Connect to PostgreSQL databases to read and write industrial data in your pipelines. This comprehensive guide walks you through everything from basic setup to advanced configurations.

Overview

The PostgreSQL connector is your gateway to relational database operations in MaestroHub. It enables you to:

  • Read data from tables, views, or custom queries
  • Write data with insert, update, or delete operations
  • Use parameterized queries for dynamic, reusable data operations
  • Secure connections with SSL/TLS encryption
TimescaleDB Support

The PostgreSQL connector also supports TimescaleDB connections for time-series data.

Connection Configuration

Creating a PostgreSQL Connection

Navigate to ConnectionsNew ConnectionPostgreSQL and configure the following:

PostgreSQL Connection Creation Fields

1. Profile Information
FieldDefaultDescription
Profile Name-A descriptive name for this connection profile (required, max 100 characters)
Description-Optional description for this PostgreSQL connection
2. Database Configuration
FieldDefaultDescription
HostlocalhostPostgreSQL server hostname or IP address - required
Port5432PostgreSQL server port (1-65535) - required
Connect Timeout (sec)30Maximum time to wait for connection establishment (0-600 seconds) - required
SchemapublicDatabase schema to use - required
Database-Database name to connect to (e.g., mydb) - required

Note: Supported PostgreSQL versions: 9.6+

3. Basic Authentication
FieldDefaultDescription
Username-PostgreSQL database user (required)
Password-PostgreSQL user password (required)
4. SSL Settings
4a. SSL Configuration
FieldDefaultDescription
Enable SSLtrueUse encrypted connection to PostgreSQL server
4b. SSL Mode and Certificates

(Only displayed when SSL is enabled)

FieldDefaultDescription
SSL ModerequireTLS/SSL connection mode used by the driver (require / verify-ca / verify-full)
CA Certificate-Trusted CA certificate (sslrootcert) in PEM format
Client Certificate-Optional client certificate (sslcert) in PEM format
Private Key-Private key for client certificate (sslkey) in PEM format

SSL Mode Options

  • require: Requires SSL connection but does not verify server certificate
  • verify-ca: Requires SSL and verifies that the server certificate is issued by a trusted CA
  • verify-full: Requires SSL and verifies both the CA and that the server hostname matches the certificate
5. Connection Pool Settings
FieldDefaultDescription
Max Open Connections100Maximum number of simultaneous database connections (0-1000). Higher values increase concurrency but add DB load
Max Idle Connections25Idle connections kept ready for reuse to reduce latency (0-1000, 0 = close idle connections immediately)
Connection Max Lifetime (sec)900Maximum age of a connection before it is recycled (0-86400 seconds, 0 = keep connections indefinitely). Helpful to avoid server-side timeouts
Connection Max Idle Time (sec)300How long an idle connection may remain unused before being closed (0-86400 seconds, 0 = disable idle timeout)

Connection pool settings help optimize database performance by balancing concurrency, resource usage, and latency across workloads.

6. Retry Configuration
FieldDefaultDescription
Retries3Number of connection attempts (0-10, 0 = no retry)
Retry Delay (ms)100Delay between retry attempts in milliseconds (0-3600000 ms)
Retry Backoff Multiplier2Exponential factor for retry delay growth (1-10, e.g., 2.0 means each retry waits twice as long)

Example Retry Behavior

  • With Retry Delay = 100ms and Retry Backoff Multiplier = 2:
    • 1st retry: wait 100ms
    • 2nd retry: wait 200ms
    • 3rd retry: wait 400ms
7. Advanced Features
FieldDefaultDescription
Use TimescaleDBfalseEnable TimescaleDB-specific optimizations (no DSN change required)
8. Connection Labels
FieldDefaultDescription
Labels-Key-value pairs to categorize and organize this PostgreSQL connection (max 10 labels)

Example Labels

  • env: prod – Environment
  • team: data – Responsible team
Notes
  • Required Fields: All fields described as "required" must be filled.
  • SSL/TLS: When SSL is disabled, SSL Mode is automatically set to disable.
  • Connection Pooling: Manages concurrency with Max Open Connections, reduces latency with Max Idle Connections, mitigates stale connections via Connection Max Lifetime, and frees resources using Connection Max Idle Time.
  • Retry Logic: Implements exponential backoff—initial connection attempts use the base delay and each retry multiplies the delay by the backoff factor; ideal for transient network issues or database restarts.
  • TimescaleDB: When enabled, provides optimizations for time-series workloads.
  • Timeout Values: Connect Timeout applies to initial connection establishment; all timeout values are in seconds unless noted (Retry Delay uses milliseconds).
  • Security Best Practices: Always enable SSL for production, prefer verify-full, supply CA certificates for verify-ca or verify-full, and include client certificates for mutual TLS when possible.

Function Builder

Creating PostgreSQL Functions

Once you have a connection established, you can create reusable functions for different operation types:

  1. Navigate to FunctionsNew Function
  2. Select one of the PostgreSQL function types: Query, Execute, or Write
  3. Choose your PostgreSQL connection
  4. Configure the function parameters
PostgreSQL Function Creation

PostgreSQL function creation interface showing available function types: Query, Execute, and Write

Query Function

Purpose: Execute SQL queries (SELECT) to read data from PostgreSQL. Returns rows as structured data.

Configuration Fields

FieldTypeRequiredDefaultDescription
SQL QueryStringYes-SQL SELECT statement to execute. Supports parameterized queries with ((param)) syntax.
Timeout (seconds)NumberNo0Per-execution timeout in seconds (0 = no limit).

Use Cases:

  • SELECT machine KPIs (OEE, downtime) from telemetry tables
  • SELECT COUNT(*) from orders grouped by status
  • SELECT with JOINs across multiple tables using $1 parameters
  • SELECT with WHERE filters, ORDER BY, and LIMIT

Execute Function

Purpose: Execute DML/DDL statements (INSERT, UPDATE, DELETE, CREATE, ALTER, DROP) on PostgreSQL. Returns rowsAffected instead of row data. Use this for data modifications and schema changes.

Configuration Fields

FieldTypeRequiredDefaultDescription
SQL StatementStringYes-DML/DDL SQL statement to execute. Supports parameterized queries with ((param)) syntax.
Timeout (seconds)NumberNo0Per-execution timeout in seconds (0 = no limit).

Use Cases:

  • INSERT production events with NOW() timestamps and $1 parameters
  • UPDATE work orders, inventory counts, or maintenance schedules
  • DELETE records older than NOW() - INTERVAL '30 days'
  • CREATE TABLE or ALTER TABLE for schema changes

Write Function

Purpose: Write pipeline data to a PostgreSQL table with automatic schema detection. This is not a SQL editor — it loads structured data (rows/objects) from your pipeline into a target table.

Load data into a PostgreSQL table with two modes: Auto (default) detects the table schema and maps incoming data fields to columns automatically; Manual lets you specify exact columns for precise control. Supports batching for efficient bulk loading.

Configuration Fields

FieldTypeRequiredDefaultDescription
Table NameStringYes-Target table to insert data into.
Write ModeStringNoautoauto = detect schema and map fields automatically, manual = specify exact columns.
ColumnsArrayNo-Column names to insert into (manual mode only).
Create Table If Not ExistsBooleanNofalseAuto-create the table if it does not exist (auto mode only). When enabled, also performs schema evolution — adds missing columns and widens types as needed.
Batch SizeNumberNo100Number of rows per INSERT batch (1–10,000).
Timeout (seconds)NumberNo0Per-execution timeout in seconds (0 = no limit).

Use Cases:

  • Auto-detect table schema and map pipeline data
  • Create tables on-the-fly with SERIAL PRIMARY KEY columns
  • Manual column mapping for precise control
  • Bulk insert production events with auto schema detection

How Write Works Step by Step

1. Batching

Each batch generates a single INSERT INTO ... VALUES (...), (...), (...) statement with multiple value tuples. If your pipeline sends 10 rows and Batch Size is 5, exactly 2 INSERT statements are executed (2 batches of 5 rows). If it sends 13 rows, 3 INSERT statements are executed (5 + 5 + 3).

note

PostgreSQL has a per-query parameter limit (65,535). If your configured batch size combined with the number of columns would exceed this limit, batches are automatically split into smaller chunks. This is handled transparently — the total number of inserted rows remains the same.

2. Column Matching

Write uses case-insensitive matching between incoming data field names and table column names. For example, a data field named deviceId will match a table column DEVICEID or deviceid. The original column casing from the database is preserved in the generated INSERT statement.

3. Table Exists — Normal Flow

  1. The function first queries information_schema to check if the target table exists
  2. If the table exists, it fetches all column metadata (name, type, nullability)
  3. It matches incoming data fields to table columns (case-insensitive)
  4. Fields in data that don't match any column are silently skipped — they are tracked and returned in the response metadata as skippedFields
  5. Batch INSERT statements are generated and executed using only the matched columns

4. Table Does Not Exist

Create Table If Not ExistsBehavior
Disabled (default)Returns an error immediately: "table 'X' does not exist. Enable 'Create table if not exists' or create it manually". No insert is attempted.
Enabled, no schema providedTypes are inferred from the first row of data using the rules below. A CREATE TABLE is executed, then all rows (including the first) are inserted — no data loss.
Enabled, with schemaThe provided schema is used for CREATE TABLE with full column definitions (types, primary keys, indexes). After creation, all rows are inserted.

Data Type Inference — When no schema is provided, the Write function infers column types from the first row of data:

Data ValueInferred TypeNotes
true / falseBOOLEANJSON boolean values
42, -7, 1000BIGINTWhole numbers (no fractional part)
3.14, 0.5DOUBLE PRECISIONNumbers with fractional part
"2024-01-15T10:30:00Z"TIMESTAMPTZStrings matching ISO 8601 / RFC 3339 and other common timestamp formats are automatically detected
"hello", "sensor-01"TEXTAll other strings
nullTEXTNull values default to text
{"key": "val"}, [1,2]JSONBComplex objects and arrays
info

Integer vs. decimal distinction is based on whether the number has a fractional part: 55BIGINT, 55.0DOUBLE PRECISION. If you need a specific numeric type (e.g., NUMERIC, SMALLINT), define it in the schema using the Visual Editor.

5. Schema Evolution (Table Exists + Create Table If Not Exists Enabled)

When Create Table If Not Exists is enabled and the table already exists, the function performs schema evolution before inserting:

  • New columns: If data contains fields not present in the table, ALTER TABLE ADD COLUMN is executed for each missing field. If a schema is defined in the Visual Editor, the specified type is used; otherwise, the type is inferred from the first row of data values.
  • Type widening: If an existing column's type is too narrow for the incoming data, it is safely widened using the rules below.
  • After schema evolution completes, column metadata is re-fetched from the database to ensure accuracy, then all rows are inserted with the updated column set.

Schema evolution only widens types — it never narrows or changes types across incompatible categories. The following rules apply:

RuleDirectionExamples
Same category, higher rankSMALLINTINTEGERBIGINTInteger grows to hold larger values
Same category, higher rankREALDOUBLE PRECISIONFloat gains precision
Same category, higher rankVARCHARTEXTJSONBString grows to hold larger content
Cross-category (boolean → integer)BOOLEANINTEGERBoolean promoted to integer
Cross-category (integer → float)BIGINTDOUBLE PRECISIONInteger promoted to float

The following changes are not supported by schema evolution and require manual ALTER TABLE statements:

  • Narrowing types (e.g., BIGINTSMALLINT, TEXTVARCHAR)
  • Incompatible category changes (e.g., TEXTNUMERIC, DOUBLE PRECISIONBOOLEAN, TIMESTAMPTZINTEGER)
  • Changing an already-created column's type by updating the Visual Editor schema alone — the schema definition only affects new columns being added
  • Dropping columns — removing a column from the Visual Editor schema does not delete it from the database table. The column remains in the table and receives NULL (or its default value) for new inserts.
tip

If you need to change an existing column's type or drop a column, use the Execute function to run ALTER TABLE ... ALTER COLUMN ... TYPE ... or ALTER TABLE ... DROP COLUMN ... manually.

If Create Table If Not Exists is disabled and data has extra columns, those fields are silently skipped — no error, no schema change.

6. Partial Failure

Each batch executes independently — there is no transaction wrapping all batches. If batch 3 of 5 fails, batches 1 and 2 have already been committed. The error response still includes rowsInserted showing how many rows succeeded before the failure.

Response Metadata

On success, the response includes:

FieldDescription
rowsInsertedTotal number of rows successfully inserted
matchedColumnsColumn names that had matching data fields
skippedFieldsData fields that didn't match any table column (if any)
schemaEvolutionALTER TABLE actions performed — add or widen (if any)
tableCreatedtrue if the table was created during this call
batchSizeBatch size used
totalRowsTotal number of input rows

Using Parameters

The ((parameterName)) syntax creates dynamic, reusable queries for Query and Execute functions. Parameters are automatically detected and can be configured with:

ConfigurationDescriptionExample
TypeData type validationstring, number, boolean, date, array
RequiredMake parameters mandatory or optionalRequired / Optional
Default ValueFallback value if not providedNOW(), 0, active
DescriptionHelp text for users"Start date for the report"
PostgreSQL Function Parameters Configuration

Parameter configuration interface showing type validation, required flags, default values, and descriptions

Pipeline Integration

Use the PostgreSQL functions you create here as nodes inside the Pipeline Designer to move data in and out of your database alongside the rest of your operations stack. Drag the appropriate node type onto the canvas, bind its parameters to upstream outputs or constants, and configure connection-level options without leaving the designer.

Each function type maps to a dedicated pipeline node — Query, Execute, and Write — so you can clearly separate read, write, and data-loading operations in your flows.

For end-to-end orchestration ideas, such as combining database reads with MQTT, REST, or analytics steps, explore the Connector Nodes page to see how SQL nodes complement other automation patterns.

PostgreSQL Query Node in Pipeline Designer

PostgreSQL Query node in the pipeline designer

Common Use Cases

Query: Reading Production Metrics

Scenario: Generate hourly production reports with efficiency metrics.

SELECT
DATE_TRUNC('hour', timestamp) as hour,
machine_id,
COUNT(*) as event_count,
AVG(efficiency) as avg_efficiency,
MIN(efficiency) as min_efficiency,
MAX(efficiency) as max_efficiency
FROM production_events
WHERE timestamp >= ((startDate)) AND timestamp < ((endDate))
GROUP BY hour, machine_id
ORDER BY hour DESC;

Pipeline Integration: Use in a pipeline to feed data to visualization dashboards, BI tools, or reporting nodes.


Execute: Updating Work Order Status

Scenario: Track manufacturing progress by updating work order status in real-time.

UPDATE work_orders
SET
status = ((newStatus)),
completed_units = ((completedUnits)),
completion_percentage = ROUND((((completedUnits))::numeric / total_units * 100), 2),
updated_at = NOW(),
updated_by = ((userId))
WHERE order_id = ((orderId))
RETURNING order_id, status, completion_percentage;

Pipeline Integration: Trigger this function based on production events, barcode scans, or manual workflows.


Execute: Data Retention and Cleanup

Scenario: Maintain database performance by archiving or deleting old data.

WITH archived AS (
INSERT INTO sensor_readings_archive
SELECT * FROM sensor_readings
WHERE timestamp < NOW() - INTERVAL '((retentionDays)) days'
AND archived = false
RETURNING id
)
DELETE FROM sensor_readings
WHERE id IN (SELECT id FROM archived);

Pipeline Integration: Schedule this function to run daily or weekly via pipeline triggers (cron jobs).


Write: Bulk Loading Sensor Data

Scenario: Load real-time sensor readings from IoT devices into a PostgreSQL table.

Configure a Write function with:

  • Table Name: sensor_readings
  • Write Mode: auto
  • Create Table If Not Exists: enabled

Connect it after data collection nodes (MQTT, OPC UA, Modbus, etc.) in your pipeline. The Write function automatically maps incoming fields (sensor_id, temperature, pressure, vibration) to table columns and inserts them in batches. If the table doesn't exist, it is created automatically with inferred types.