Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugins/outputs/all/snowpipe_streaming.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build !custom || outputs || outputs.snowpipe_streaming

package all

import _ "github.com/influxdata/telegraf/plugins/outputs/snowpipe_streaming" // register plugin
201 changes: 201 additions & 0 deletions plugins/outputs/snowpipe_streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
# Snowpipe Streaming Output Plugin

This plugin writes metrics to [Snowflake][snowflake] using efficient batch
inserts via the [gosnowflake][gosnowflake] driver with array binding, which
leverages Snowpipe Streaming internally for low-latency, high-throughput
ingest without staging files.
Comment on lines +4 to +6
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are implementation / developer-targeted oriented details so we should remove them as this documentation is user-focused.

Suggested change
inserts via the [gosnowflake][gosnowflake] driver with array binding, which
leverages Snowpipe Streaming internally for low-latency, high-throughput
ingest without staging files.
inserts.


[snowflake]: https://www.snowflake.com/
[gosnowflake]: https://github.com/snowflakedb/gosnowflake

⭐ Telegraf v1.35.0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is

Suggested change
⭐ Telegraf v1.35.0
⭐ Telegraf v1.39.0

;-)

🏷️ cloud, datastore
💻 all

## Prerequisites

1. A Snowflake account with a database and schema already created.
2. Key-pair authentication configured for the Snowflake user:
- Generate an RSA key pair:

```bash
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
```

- Assign the public key to the user:

```sql
ALTER USER my_user SET RSA_PUBLIC_KEY='<public key contents>';
```

3. The user must have INSERT privileges on the target table(s).
4. If `create_table = true`, the user must also have CREATE TABLE privileges.
Comment on lines +15 to +33
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please cut this down to really Telegraf related details. For the key-pair generation please link to the upstream documentation as nobody will maintain this here if things change in the database.


## Global configuration options
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep the inclusion "comment" which is used to auto-insert the global configuration section so we can change it in one common place

Suggested change
## Global configuration options
## Global configuration options <!-- @/docs/includes/plugin_config.md -->


Plugins support additional global and plugin configuration settings for tasks
such as modifying metrics, tags, and fields, creating aliases, and configuring
plugin ordering. See [CONFIGURATION.md][CONFIGURATION.md] for more details.

[CONFIGURATION.md]: ../../../docs/CONFIGURATION.md#plugins

## Configuration

```toml @sample.conf
# Stream metrics to Snowflake via Snowpipe Streaming
[[outputs.snowpipe_streaming]]
## Snowflake account identifier (e.g. "xy12345.us-east-1")
account = ""

## Snowflake username for key-pair authentication
user = ""

## Path to RSA private key file (PEM format) for key-pair auth
private_key_path = ""

## Optional passphrase for the RSA private key
# private_key_passphrase = ""

## Snowflake role to use
# role = ""

## Target database name
database = ""

## Target schema name
schema = ""

## Target table name
## Supports Go templates with access to metric properties:
## {{.Name}} - metric name
## {{.Tag "key"}} - tag value
## Example: "metrics_{{.Name}}" routes each metric name to a separate table
table = ""
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please have the metric name as default here?


## Number of rows per insert batch
# batch_size = 1000

## Maximum number of retries on transient errors
# retry_max = 3

## Delay between retries (exponential backoff base)
# retry_delay = "1s"

## Column name to store the metric timestamp
# timestamp_column = "timestamp"

## Restrict which tags to include as columns (empty = all tags)
# tag_columns = []

## Restrict which fields to include as columns (empty = all fields)
# field_columns = []
Comment on lines +88 to +92
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? This can already be done using the global taginclude/exclude and fieldinclude/exclude options. What does this add here?


## Automatically create the target table if it does not exist
# create_table = false

## How long to cache table schema information
# table_schema_cache_ttl = "5m"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this value really make sense? To be honest I would expect most users to not change the scheme in the run...

```

## Table Schema

Each metric is stored as a row with the following column mapping:

| Column | Type | Source |
|--------------------|---------------|-----------------------|
| `timestamp` | TIMESTAMP_NTZ | Metric timestamp |
| `name` | VARCHAR | Metric name |
| *(each tag key)* | VARCHAR | Tag value |
| *(each field key)* | varies | Field value |
Comment on lines +105 to +110
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be sufficient to just mention the "special" timestamp and name column?


Field type mapping:

| Go Type | Snowflake Type |
|-----------------|----------------|
| int64, uint64 | NUMBER |
| float64 | DOUBLE |
| bool | BOOLEAN |
| string | VARCHAR |
Comment on lines +114 to +119
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest this is exactly what you would expect so why do you need to add this here?


When `create_table = true`, the plugin will create the table with appropriate
types. When new tags or fields appear, columns are automatically added via
Comment on lines +121 to +122
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds no info compared to the sample-configuration...

`ALTER TABLE ADD COLUMN`.

## Example Configurations

### Basic — single table

```toml
[[outputs.snowpipe_streaming]]
account = "xy12345.us-east-1"
user = "TELEGRAF_USER"
private_key_path = "/etc/telegraf/snowflake_key.p8"
database = "TELEMETRY"
schema = "PUBLIC"
table = "METRICS"
create_table = true
```

### Template-based table routing

```toml
[[outputs.snowpipe_streaming]]
account = "xy12345.us-east-1"
user = "TELEGRAF_USER"
private_key_path = "/etc/telegraf/snowflake_key.p8"
database = "TELEMETRY"
schema = "RAW"
table = "metrics_{{.Name}}"
create_table = true
```

### Specific columns only

```toml
[[outputs.snowpipe_streaming]]
account = "xy12345.us-east-1"
user = "TELEGRAF_USER"
private_key_path = "/etc/telegraf/snowflake_key.p8"
database = "TELEMETRY"
schema = "PUBLIC"
table = "CPU_METRICS"
tag_columns = ["host", "cpu"]
field_columns = ["usage_idle", "usage_user", "usage_system"]
batch_size = 5000
```
Comment on lines +125 to +166
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This does not add any value and this will be outdated soon if someone adds a new option etc.


## Troubleshooting

### Authentication errors

Ensure your RSA key pair is correctly configured:

```sql
DESC USER my_user;
```

Check that `RSA_PUBLIC_KEY_FP` is set and matches your key.

### Permission errors

The user/role must have the required grants:

```sql
GRANT USAGE ON DATABASE telemetry TO ROLE my_role;
GRANT USAGE ON SCHEMA telemetry.public TO ROLE my_role;
GRANT INSERT ON TABLE telemetry.public.metrics TO ROLE my_role;
-- If using create_table = true:
GRANT CREATE TABLE ON SCHEMA telemetry.public TO ROLE my_role;
```

### Transient errors and retries

The plugin automatically retries on transient errors (connection resets,
timeouts, service unavailable) with exponential backoff. Increase `retry_max`
and `retry_delay` for unreliable networks.

### NaN/Inf field values

Fields containing NaN or Inf float values are inserted as NULL to avoid
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Fields containing NaN or Inf float values are inserted as NULL to avoid
Fields containing `NaN` or infinite float values are inserted as `NULL` to avoid

Snowflake errors.
53 changes: 53 additions & 0 deletions plugins/outputs/snowpipe_streaming/sample.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Stream metrics to Snowflake via Snowpipe Streaming
[[outputs.snowpipe_streaming]]
## Snowflake account identifier (e.g. "xy12345.us-east-1")
account = ""

## Snowflake username for key-pair authentication
user = ""

## Path to RSA private key file (PEM format) for key-pair auth
private_key_path = ""

## Optional passphrase for the RSA private key
# private_key_passphrase = ""

## Snowflake role to use
# role = ""

## Target database name
database = ""

## Target schema name
schema = ""

## Target table name
## Supports Go templates with access to metric properties:
## {{.Name}} - metric name
## {{.Tag "key"}} - tag value
## Example: "metrics_{{.Name}}" routes each metric name to a separate table
table = ""

## Number of rows per insert batch
# batch_size = 1000
Comment on lines +31 to +32
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metric_batch_size is a global setting that should determine the batch-size instead of adding another parameter!


## Maximum number of retries on transient errors
# retry_max = 3

## Delay between retries (exponential backoff base)
# retry_delay = "1s"

## Column name to store the metric timestamp
# timestamp_column = "timestamp"

## Restrict which tags to include as columns (empty = all tags)
# tag_columns = []

## Restrict which fields to include as columns (empty = all fields)
# field_columns = []

## Automatically create the target table if it does not exist
# create_table = false

## How long to cache table schema information
# table_schema_cache_ttl = "5m"
Loading
Loading