MySQL Integration Guide
Connect MaestroHub pipelines to MySQL databases for flexible, high-performance data ingestion and orchestration. This guide covers connection setup, security configuration, and query functions tailored for MySQL workloads.
Overview
The MySQL connector brings relational database capabilities to your MaestroHub pipelines:
- Read and write data using standard SQL queries with full parameter support
- Secure connections with five SSL/TLS modes including mutual TLS (mTLS)
- Broad compatibility with MySQL 5.7+, MariaDB 10.2+, and Aurora MySQL
The MySQL connector is validated with MySQL 5.7+, MariaDB 10.2+, and Amazon Aurora MySQL.
Connection Configuration
Creating a MySQL Connection
Navigate to Connections → New Connection → MySQL and configure the following:
MySQL Connection Creation Fields
1. Profile Information
| Field | Default | Description |
|---|---|---|
| Profile Name | - | A descriptive name for this connection profile (required, max 100 characters) |
| Description | - | Optional description for this MySQL connection |
2. Database Configuration
| Field | Default | Description |
|---|---|---|
| Host | localhost | MySQL server hostname or IP address (required) |
| Port | 3306 | MySQL server port (1-65535) |
| Database | - | Database name to connect to (required) |
| Connect Timeout (sec) | 30 | Maximum time to wait for connection establishment (0-600 seconds) |
Note: Compatible with MySQL 5.7+, MariaDB 10.2+, and Aurora MySQL.
3. Basic Authentication
| Field | Default | Description |
|---|---|---|
| Username | - | MySQL database user |
| Password | - | MySQL user password |
4. SSL/TLS Settings
4a. SSL Mode
| Field | Default | Description |
|---|---|---|
| SSL Mode | preferred | TLS/SSL connection mode (disabled / preferred / required / verify-ca / verify-identity) |
SSL Mode Options
disabled: No encryption — data transmitted in plain textpreferred: Use TLS if the server supports it; fall back to unencrypted otherwise (default)required: Enforce TLS encryption; fail if the server does not support itverify-ca: Enforce TLS and verify the server certificate against a trusted CAverify-identity: Enforce TLS, verify CA, and also verify the server hostname matches the certificate
4b. Certificate Configuration
(Only displayed when SSL Mode is verify-ca or verify-identity)
| Field | Default | Description |
|---|---|---|
| CA Certificate | - | Trusted CA certificate in PEM format. Only required for self-signed or private CA certificates; leave empty for public CAs |
| Client Certificate | - | Optional client certificate in PEM format for mutual TLS (mTLS) |
| Private Key | - | Private key corresponding to the client certificate in PEM format |
5. Connection Pool Settings
| Field | Default | Description |
|---|---|---|
| Max Open Connections | 100 | Maximum number of simultaneous database connections (1-1000). Higher values increase concurrency but add DB load |
| Max Idle Connections | 25 | Idle connections kept ready for reuse to reduce latency (0-1000, 0 = close idle connections immediately) |
| Connection Max Lifetime (sec) | 900 | Maximum age of a connection before it is recycled (0-86400 seconds, 0 = keep connections indefinitely). Helps avoid server-side timeouts |
| Connection Max Idle Time (sec) | 300 | How long an idle connection may remain unused before being closed (0-86400 seconds, 0 = disable idle timeout) |
Connection pool settings help optimize database performance by balancing concurrency, resource usage, and latency across workloads.
6. Retry Configuration
| Field | Default | Description |
|---|---|---|
| Retries | 3 | Number of connection retry attempts (0-10, 0 = no retry) |
| Retry Delay (ms) | 200 | Initial delay between retry attempts in milliseconds (0-3,600,000 ms) |
| Retry Backoff Multiplier | 2 | Exponential factor for retry delay growth (1.0-10.0, e.g., 2 means delay doubles each retry) |
Retry Behavior Example
- 1st retry: wait 200ms
- 2nd retry: wait 400ms (200 × 2)
- 3rd retry: wait 800ms (400 × 2)
7. Connection Labels
| Field | Default | Description |
|---|---|---|
| Labels | - | Key-value pairs to categorize and organize this MySQL connection (max 10 labels) |
Example Labels
env: prod— Environmentteam: data— Responsible teamregion: us-east— Geographical region
- Required Fields: All fields described as "required" must be filled.
- SSL/TLS: When SSL Mode is set to
disabled, no encryption is used. Usepreferredfor opportunistic encryption orrequiredand above for enforced encryption. - Certificate Fields: CA Certificate, Client Certificate, and Private Key are only shown when SSL Mode is
verify-caorverify-identity. - Connection Pooling: Manages concurrency with
Max Open Connections, reduces latency withMax Idle Connections, mitigates stale connections viaConnection Max Lifetime, and frees resources usingConnection Max Idle Time. - Retry Logic: Implements exponential backoff — initial connection attempts use the base delay and each retry multiplies the delay by the backoff factor; ideal for transient network issues or database restarts.
- Timeout Values:
Connect Timeoutapplies to initial connection establishment; all timeout values are in seconds unless noted (Retry Delay uses milliseconds). - Security Best Practices: Always use
requiredor stricter SSL modes for production. Preferverify-identityand supply CA certificates for maximum security. Use client certificates for mutual TLS when possible.
Function Builder
Creating MySQL Functions
Once you have a connection established, you can create reusable functions for different operation types:
- Navigate to Functions → New Function
- Select one of the MySQL function types: Query, Execute, or Write
- Choose your MySQL connection
- Configure the function parameters

MySQL function creation interface showing available function types: Query, Execute, and Write
Query Function
Purpose: Execute SQL queries (SELECT) to read data from MySQL. Returns rows as structured data.
Configuration Fields
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| SQL Query | String | Yes | - | SQL SELECT statement to execute. Supports parameterized queries with ((param)) syntax. |
| Timeout (seconds) | Number | No | 0 | Per-execution timeout in seconds (0 = no limit). |
Use Cases:
- SELECT machine KPIs (OEE, downtime) from telemetry tables
- SELECT COUNT(*) from orders grouped by status
- SELECT with JOINs across multiple tables
- SELECT with WHERE filters and ORDER BY
Execute Function
Purpose: Execute DML/DDL statements (INSERT, UPDATE, DELETE, CREATE, ALTER, DROP) on MySQL. Returns rowsAffected instead of row data. Use this for data modifications and schema changes.
Configuration Fields
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| SQL Statement | String | Yes | - | DML/DDL SQL statement to execute. Supports parameterized queries with ((param)) syntax. |
| Timeout (seconds) | Number | No | 0 | Per-execution timeout in seconds (0 = no limit). |
Use Cases:
- INSERT production events (start/stop, shift logs, quality checks)
- UPDATE work orders, inventory counts, or maintenance schedules
- DELETE or archive old telemetry and audit logs
- CREATE TABLE or ALTER TABLE for schema changes
Write Function
Purpose: Write pipeline data to a MySQL table with automatic schema detection. This is not a SQL editor — it loads structured data (rows/objects) from your pipeline into a target table.
Load data into a MySQL table with two modes: Auto (default) detects the table schema and maps incoming data fields to columns automatically; Manual lets you specify exact columns for precise control. Supports batching for efficient bulk loading.
Configuration Fields
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
| Table Name | String | Yes | - | Target table to insert data into. |
| Write Mode | String | No | auto | auto = detect schema and map fields automatically, manual = specify exact columns. |
| Columns | Array | No | - | Column names to insert into (manual mode only). |
| Create Table If Not Exists | Boolean | No | false | Auto-create the table if it does not exist (auto mode only). When enabled, also performs schema evolution — adds missing columns and widens types as needed. |
| Batch Size | Number | No | 100 | Number of rows per INSERT batch (1–10,000). |
| Timeout (seconds) | Number | No | 0 | Per-execution timeout in seconds (0 = no limit). |
Use Cases:
- Auto-detect table schema and map pipeline data
- Create tables on-the-fly from pipeline data
- Manual column mapping for precise control
- Bulk insert production events with auto schema detection
How Write Works Step by Step
1. Batching
Each batch generates a single INSERT INTO `table` (`col1`, `col2`) VALUES (?, ?), (?, ?) statement with multiple value tuples. If your pipeline sends 10 rows and Batch Size is 5, exactly 2 INSERT statements are executed (2 batches of 5 rows). If it sends 13 rows, 3 INSERT statements are executed (5 + 5 + 3).
2. Column Matching
Write uses case-insensitive matching between incoming data field names and table column names. For example, a data field named deviceId will match a table column DEVICEID or deviceid. The original column casing from the database is preserved in the generated INSERT statement.
3. Table Exists — Normal Flow
- The function first queries
INFORMATION_SCHEMAto check if the target table exists - If the table exists, it fetches all column metadata (name, type, nullability)
- It matches incoming data fields to table columns (case-insensitive)
- Fields in data that don't match any column are silently skipped — they are tracked and returned in the response metadata as
skippedFields - Batch INSERT statements are generated and executed using only the matched columns
4. Table Does Not Exist
| Create Table If Not Exists | Behavior |
|---|---|
| Disabled (default) | Returns an error immediately: "table 'X' does not exist. Enable 'Create table if not exists' or create it manually". No insert is attempted. |
| Enabled, no schema provided | Types are inferred from the first row of data using the rules below. A CREATE TABLE is executed, then all rows (including the first) are inserted — no data loss. |
| Enabled, with schema | The provided schema is used for CREATE TABLE with full column definitions (types, primary keys, indexes). After creation, all rows are inserted. |
Data Type Inference — When no schema is provided, the Write function infers column types from the first row of data:
| Data Value | Inferred Type | Notes |
|---|---|---|
true / false | TINYINT(1) | JSON boolean values |
42, -7, 1000 | BIGINT | Whole numbers (no fractional part) |
3.14, 0.5 | DOUBLE | Numbers with fractional part |
"2024-01-15T10:30:00Z" | DATETIME | Strings matching ISO 8601 / RFC 3339 and other common timestamp formats are automatically detected |
"hello", "sensor-01" | TEXT | All other strings |
null | TEXT | Null values default to text |
{"key": "val"}, [1,2] | JSON | Complex objects and arrays |
Integer vs. decimal distinction is based on whether the number has a fractional part: 55 → BIGINT, 55.0 → DOUBLE. If you need a specific numeric type (e.g., DECIMAL, INT), define it in the schema using the Visual Editor.
5. Schema Evolution (Table Exists + Create Table If Not Exists Enabled)
When Create Table If Not Exists is enabled and the table already exists, the function performs schema evolution before inserting:
- New columns: If data contains fields not present in the table,
ALTER TABLE ADD COLUMNis executed for each missing field. If a schema is defined in the Visual Editor, the specified type is used; otherwise, the type is inferred from the first row of data values. - Type widening: If an existing column's type is too narrow for the incoming data, it is safely widened using the rules below.
- After schema evolution completes, column metadata is re-fetched from the database to ensure accuracy, then all rows are inserted with the updated column set.
Schema evolution only widens types — it never narrows or changes types across incompatible categories. The following rules apply:
| Rule | Direction | Examples |
|---|---|---|
| Same category, higher rank | TINYINT → SMALLINT → INT → BIGINT | Integer grows to hold larger values |
| Same category, higher rank | FLOAT → DOUBLE | Float gains precision |
| Same category, higher rank | VARCHAR → TEXT → MEDIUMTEXT → LONGTEXT → JSON | String grows to hold larger content |
| Cross-category (boolean → integer) | TINYINT(1) → INT | Boolean promoted to integer |
| Cross-category (integer → float) | BIGINT → DOUBLE | Integer promoted to float |
The following changes are not supported by schema evolution and require manual ALTER TABLE statements:
- Narrowing types (e.g.,
BIGINT→INT,TEXT→VARCHAR) - Incompatible category changes (e.g.,
TEXT→DECIMAL,DOUBLE→TINYINT(1),DATETIME(6)→INT) - Changing an already-created column's type by updating the Visual Editor schema alone — the schema definition only affects new columns being added
- Dropping columns — removing a column from the Visual Editor schema does not delete it from the database table. The column remains in the table and receives
NULL(or its default value) for new inserts.
If you need to change an existing column's type or drop a column, use the Execute function to run ALTER TABLE ... MODIFY COLUMN ... or ALTER TABLE ... DROP COLUMN ... manually.
If Create Table If Not Exists is disabled and data has extra columns, those fields are silently skipped — no error, no schema change.
6. Partial Failure
Each batch executes independently — there is no transaction wrapping all batches. If batch 3 of 5 fails, batches 1 and 2 have already been committed. The error response still includes rowsInserted showing how many rows succeeded before the failure.
Response Metadata
On success, the response includes:
| Field | Description |
|---|---|
rowsInserted | Total number of rows successfully inserted |
matchedColumns | Column names that had matching data fields |
skippedFields | Data fields that didn't match any table column (if any) |
schemaEvolution | ALTER TABLE actions performed — add or widen (if any) |
tableCreated | true if the table was created during this call |
batchSize | Batch size used |
totalRows | Total number of input rows |
Using Parameters
MaestroHub detects the ((parameterName)) syntax and exposes each parameter for validation and runtime binding for Query and Execute functions.
| Configuration | Description | Example |
|---|---|---|
| Type | Enforce data types for incoming values | string, number, boolean, date, array |
| Required | Mark parameters as mandatory or optional | Required / Optional |
| Default Value | Fallback value applied when callers omit a parameter | NOW(), 0, 'active' |
| Description | Add guidance for downstream users | "Start date for the report" |

Parameter configuration with validation rules and helper text
Pipeline Integration
Use the MySQL functions you create here as nodes inside the Pipeline Designer to integrate your MySQL databases with the rest of your operations stack. Drag the appropriate node type onto the canvas, bind its parameters to upstream outputs or constants, and configure connection-level options without leaving the designer.
Each function type maps to a dedicated pipeline node — Query, Execute, and Write — so you can clearly separate read, write, and data-loading operations in your flows.
For end-to-end orchestration ideas, such as combining database reads with MQTT, REST, or analytics steps, explore the Connector Nodes page to see how SQL nodes complement other automation patterns.
Common Use Cases
Query: Reading Production Metrics
Scenario: Generate hourly production reports with efficiency metrics from MySQL tables.
SELECT
DATE_FORMAT(event_time, '%Y-%m-%d %H:00:00') AS hour_bucket,
line_id,
COUNT(*) AS event_count,
ROUND(AVG(efficiency), 2) AS avg_efficiency,
MIN(efficiency) AS min_efficiency,
MAX(efficiency) AS max_efficiency
FROM production_events
WHERE event_time >= ((startDate)) AND event_time < ((endDate))
GROUP BY hour_bucket, line_id
ORDER BY hour_bucket DESC;
Pipeline Integration: Feed data to visualization dashboards, BI tools, or reporting nodes.
Execute: Updating Work Order Status
Scenario: Track manufacturing progress by updating work order status in real-time.
UPDATE work_orders
SET
status = ((newStatus)),
completed_units = ((completedUnits)),
completion_percentage = ROUND(((completedUnits)) / total_units * 100, 2),
updated_at = NOW(),
updated_by = ((userId))
WHERE order_id = ((orderId));
Pipeline Integration: Trigger this function based on production events, barcode scans, or manual workflows.
Execute: Data Retention and Cleanup
Scenario: Maintain database performance by archiving or deleting old data.
INSERT INTO sensor_readings_archive
SELECT * FROM sensor_readings
WHERE recorded_at < DATE_SUB(NOW(), INTERVAL ((retentionDays)) DAY)
LIMIT 10000;
DELETE FROM sensor_readings
WHERE recorded_at < DATE_SUB(NOW(), INTERVAL ((retentionDays)) DAY)
LIMIT 10000;
Pipeline Integration: Schedule this function to run daily or weekly via pipeline triggers (cron jobs).
Write: Bulk Loading Sensor Data
Scenario: Load real-time sensor readings from IoT devices into a MySQL table.
Configure a Write function with:
- Table Name:
sensor_readings - Write Mode:
auto - Create Table If Not Exists: enabled
Connect it after data collection nodes (MQTT, OPC UA, Modbus, etc.) in your pipeline. The Write function automatically maps incoming fields (sensor_id, temperature, pressure, vibration) to table columns and inserts them in batches. If the table doesn't exist, it is created automatically with inferred types.