Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,28 @@ We have following examples to show Async Insert in action.
Available options:
- [WithReleaseConnection](examples/clickhouse_api/batch_release_connection.go) - after PrepareBatch connection will be returned to the pool. It can help you make a long-lived batch.

## File upload

The `InsertFile` method allows uploading a file directly to ClickHouse without re-packing or parsing its contents.
If the file is already compressed using a ClickHouse-supported compression algorithm, it will be sent as-is.

This upload method significantly reduces client-side resource usage and is designed for data processing pipelines
where a pre-generated file is received and must be ingested into ClickHouse with minimal overhead.

This method is supported only by the HTTP connector. Attempts to use it with the native protocol will return an error.

In most cases, providing just the file path and an INSERT query is sufficient:
```
INSERT INTO <table> FORMAT <format>
```
or the extended form:
```
INSERT INTO <table> (col1, col2) SETTINGS <setting>=<value> FORMAT <format>
```

If needed, `Content-Type` and `Content-Encoding` can be explicitly overridden using
`WithFileContentType()` and `WithFileEncoding()`.

## Benchmark

| [V2 (READ) std](benchmark/v2/read/main.go) | [V2 (READ) clickhouse API](benchmark/v2/read-native/main.go) |
Expand Down
37 changes: 36 additions & 1 deletion clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync"
"sync/atomic"
"time"

_ "time/tzdata"

"github.com/ClickHouse/clickhouse-go/v2/contributors"
Expand Down Expand Up @@ -88,6 +87,7 @@ type nativeTransport interface {
prepareBatch(ctx context.Context, release nativeTransportRelease, acquire nativeTransportAcquire, query string, opts driver.PrepareBatchOptions) (driver.Batch, error)
exec(ctx context.Context, query string, args ...any) error
asyncInsert(ctx context.Context, query string, wait bool, args ...any) error
insertFile(ctx context.Context, filePath string, query string) error
ping(context.Context) error
isBad() bool
connID() int
Expand Down Expand Up @@ -215,6 +215,41 @@ func (ch *clickhouse) AsyncInsert(ctx context.Context, query string, wait bool,
return nil
}

// InsertFile streams a local file directly to ClickHouse over HTTP as the request body.
//
// The file is sent "as-is" without any decompression or recompression on the client side.
// This method is intended for INSERT ... FORMAT <fmt> queries where the payload is already
// prepared and optionally compressed (e.g. TSV.zst).
//
// Compression handling:
// - If contentEncoding is explicitly provided, it is set as HTTP "Content-Encoding".
// - If contentEncoding is empty, the encoding is auto-detected from the file extension
// (e.g. ".zst" → "zstd", ".gz" → "gzip").
// - The driver does NOT attempt to decode, encode, or transform the stream.
//
// Parameters:
// - ctx: request context (cancellation, deadlines).
// - filePath: path to the file to upload; the file is streamed and not buffered in memory.
// - query: ClickHouse INSERT query (typically "INSERT INTO <table> FORMAT <format>").
//
// Limitations:
// - External tables are not supported for file uploads.
// - This method is available only for the HTTP transport.
//
// Typical usage:
// err := conn.InsertFile(ctx, "data.tsv.zstd", "INSERT INTO db.table FORMAT TSV")
//
// On success, the file contents are fully consumed and the request body is discarded.
func (ch *clickhouse) InsertFile(ctx context.Context, filePath string, query string) error {
conn, err := ch.acquire(ctx)
if err != nil {
return err
}
defer ch.release(conn, nil)
conn.debugf("[insertFile] \"%s\" with \"%s\"", filePath, query)
return conn.insertFile(ctx, filePath, query)
}
Comment thread
cl-bvl marked this conversation as resolved.
Outdated

func (ch *clickhouse) Ping(ctx context.Context) (err error) {
conn, err := ch.acquire(ctx)
if err != nil {
Expand Down
158 changes: 158 additions & 0 deletions conn_http_insert_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package clickhouse

import (
"context"
"fmt"
"os"
"regexp"
"strings"
)

var contentEncodingExtensions = map[string][]string{
"gzip": {".gz", ".gzip"},
"br": {".br", ".brotli"},
"deflate": {".deflate"},
"xz": {".xz"},
"zstd": {".zst", ".zstd"},
"lz4": {".lz", ".lz4"},
"bz2": {".bz2"},
"snappy": {".snappy"},
}

// insertFile streams a local file directly to ClickHouse over HTTP as the request body.
//
// The file is sent "as-is" without any decompression or recompression on the client side.
// This method is intended for INSERT ... FORMAT <fmt> queries where the payload is already
// prepared and optionally compressed (e.g. TSV.zst).
//
// Compression handling:
// - If contentEncoding is explicitly provided, it is set as HTTP "Content-Encoding".
// - If contentEncoding is empty, the encoding is auto-detected from the file extension
// (e.g. ".zst" → "zstd", ".gz" → "gzip").
// - The driver does NOT attempt to decode, encode, or transform the stream.
//
// Parameters:
// - ctx: request context (cancellation, deadlines).
// - filePath: path to the file to upload; the file is streamed and not buffered in memory.
// - query: ClickHouse INSERT query (typically "INSERT INTO <table> FORMAT <format>").
//
// Limitations:
// - External tables are not supported for file uploads.
// - This method is available only for the HTTP transport.
//
// Typical usage:
// err := conn.insertFile(ctx, "data.tsv.zst", "text/tab-separated-values", "zstd", "INSERT INTO db.table FORMAT TSV")
//
// On success, the file contents are fully consumed and the request body is discarded.
func (h *httpConnect) insertFile(ctx context.Context, filePath string, query string) error {
f, err := os.OpenFile(filePath, os.O_RDONLY, 0644)
if err != nil {
return err
}
defer f.Close()
Comment thread
cl-bvl marked this conversation as resolved.
Outdated

options := queryOptions(ctx)
options.settings["query"] = query

if len(options.external) > 0 {
return fmt.Errorf("external tables are not supported for file upload")
}
if options.fileEncoding == "" {
options.fileEncoding = encodingFromFileName(f.Name())
}
if options.fileContentType == "" {
options.fileContentType = contentTypeFromFormat(parseFormatFromSQL(query))
if options.fileContentType == "" {
return fmt.Errorf("unknown file Content-Type")
}
}

headers := map[string]string{"Content-Type": options.fileContentType}
if options.fileEncoding != "" {
headers["Content-Encoding"] = options.fileEncoding
}

switch h.compression {
case CompressionZSTD, CompressionLZ4:
options.settings["compress"] = "1"
case CompressionGZIP, CompressionDeflate, CompressionBrotli:
// request encoding
headers["Accept-Encoding"] = h.compression.String()
}

req, err := h.createRequest(ctx, h.url.String(), f, &options, headers)
if err != nil {
return err
}

res, err := h.executeRequest(req)
if err != nil {
return err
}
defer discardAndClose(res.Body)

return nil
}

func encodingFromFileName(fileName string) string {
extentions := map[string][]string{
"gzip": {".gz", ".gzip"},
"br": {".br", ".brotli"},
"deflate": {".deflate"},
"xz": {".xz"},
"zstd": {".zst", ".zstd"},
"lz4": {".lz", ".lz4"},
"bz2": {".bz2"},
"snappy": {".snappy"},
}

for encoding, extention := range extentions {
for _, ext := range extention {
if strings.HasSuffix(fileName, ext) {
return encoding
}
}
}

return ""
}

func parseFormatFromSQL(query string) string {
re := regexp.MustCompile(`(?i)\bformat\b\s*([A-Za-z0-9_]+)`)
m := re.FindStringSubmatch(query)
if len(m) > 1 {
return m[1]
}
return ""
}

func contentTypeFromFormat(format string) string {
formats := map[string][]string{
"text/tab-separated-values": {
"TabSeparated", "TSV",
"TabSeparatedRaw", "TSVRaw", "Raw",
"TabSeparatedWithNames", "TSVWithNames", "RawWithNames",
"TabSeparatedWithNamesAndTypes", "TSVWithNamesAndTypes", "RawWithNamesAndTypes",
"TabSeparatedRawWithNames", "TSVRawWithNames", "RawWithNames",
"TabSeparatedRawWithNamesAndTypes", "TSVRawWithNamesAndNames", "RawWithNamesAndNames",
},
"text/csv": {"CSV", "CSVWithNames", "CSVWithNamesAndTypes"},
"application/json": {
"JSON", "JSONAsString", "JSONAsObject", "JSONStrings", "JSONColumns", "JSONColumnsWithMetadata", "JSONObjectEachRow",
"JSONEachRow", "PrettyJSONEachRow", "JSONEachRowWithProgress", "JSONStringsEachRow", "JSONStringsEachRowWithProgress",
"JSONCompact", "JSONCompactStrings", "JSONCompactColumns", "JSONCompactEachRow", "JSONCompactEachRowWithNames",
"JSONCompactEachRowWithNamesAndTypes", "JSONCompactEachRowWithProgress", "JSONCompactStringsEachRow",
"JSONCompactStringsEachRowWithNames", "JSONCompactStringsEachRowWithNamesAndTypes", "JSONCompactStringsEachRowWithProgress",
},
}

for contentType, fmts := range formats {
for _, fmt := range fmts {
if strings.ToLower(fmt) == strings.ToLower(format) {
return contentType
}
}
}

return "application/octet-stream"
}
10 changes: 10 additions & 0 deletions conn_insert_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package clickhouse

import (
"context"
"fmt"
)

func (c *connect) insertFile(ctx context.Context, filePath string, query string) error {
return fmt.Errorf("InsertFile is not implemented for Native connector")
}
4 changes: 4 additions & 0 deletions conn_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,10 @@ func (m *mockTransport) exec(ctx context.Context, query string, args ...any) err
return nil
}

func (m *mockTransport) insertFile(ctx context.Context, file string, query string) error {
return nil
}

func (m *mockTransport) asyncInsert(ctx context.Context, query string, wait bool, args ...any) error {
return nil
}
Expand Down
24 changes: 24 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package clickhouse

import (
"context"
"fmt"
"maps"
"slices"
"strings"
"time"

"github.com/ClickHouse/clickhouse-go/v2/ext"
Expand Down Expand Up @@ -57,6 +59,8 @@ type (
userLocation *time.Location
columnNamesAndTypes []ColumnNameAndType
clientInfo ClientInfo
fileContentType string
fileEncoding string
}
)

Expand Down Expand Up @@ -189,6 +193,26 @@ func WithUserLocation(location *time.Location) QueryOption {
}
}

// WithFileContentType set Content-Type for upload HTTP requests: (e.g. "text/tab-separated-values")
func WithFileContentType(ct string) QueryOption {
return func(o *QueryOptions) error {
o.fileContentType = strings.ToLower(ct)
return nil
}
}

// WithFileEncoding set Content-Encoding for upload HTTP requests (e.g. "zstd", "gzip")
func WithFileEncoding(encoding string) QueryOption {
return func(o *QueryOptions) error {
enc := strings.ToLower(encoding)
if _, ok := contentEncodingExtensions[enc]; !ok {
return fmt.Errorf("unsupported file content encoding: %s", encoding)
}
o.fileEncoding = enc
return nil
}
}

func ignoreExternalTables() QueryOption {
return func(o *QueryOptions) error {
o.external = nil
Expand Down
2 changes: 1 addition & 1 deletion lib/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type (
QueryRow(ctx context.Context, query string, args ...any) Row
PrepareBatch(ctx context.Context, query string, opts ...PrepareBatchOption) (Batch, error)
Exec(ctx context.Context, query string, args ...any) error

InsertFile(ctx context.Context, filePath string, query string) error
// Deprecated: use context aware `WithAsync()` for any async operations
AsyncInsert(ctx context.Context, query string, wait bool, args ...any) error
Ping(context.Context) error
Expand Down
Loading