Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,31 @@ We have following examples to show Async Insert in action.
Available options:
- [WithReleaseConnection](examples/clickhouse_api/batch_release_connection.go) - after PrepareBatch connection will be returned to the pool. It can help you make a long-lived batch.

## Upload File

The `UploadFile` method allows uploading a file directly to ClickHouse without re-packing or parsing its contents.
If the file is already compressed using a ClickHouse-supported compression algorithm, it will be sent as-is.

This upload method significantly reduces client-side resource usage and is designed for data processing pipelines
where a pre-generated file is received and must be ingested into ClickHouse with minimal overhead.

**This method is supported only by the HTTP connector**.
Attempts to use it with the native protocol will return an error.


In most cases, the file encoding can be set automatically from the file name (`clickhouse.WithFileEncoding()`)
and the Content-Type based on the INSERT FORMAT from the query:
```
INSERT INTO <table> FORMAT <format>
```
or the extended form:
```
INSERT INTO <table> (col1, col2) SETTINGS <setting>=<value> FORMAT <format>
```

If needed, `Content-Type` and `Content-Encoding` can be explicitly overridden using
`WithFileContentType()` and `WithFileEncoding()`.

## Benchmark

| [V2 (READ) std](benchmark/v2/read/main.go) | [V2 (READ) clickhouse API](benchmark/v2/read-native/main.go) |
Expand Down Expand Up @@ -369,6 +394,7 @@ go get -u github.com/ClickHouse/clickhouse-go/v2
* [query parameters](examples/clickhouse_api/query_parameters.go)
* [bind params](examples/clickhouse_api/bind.go) (deprecated in favour of native query parameters)
* [client info](examples/clickhouse_api/client_info.go)
* [upload file](examples/clickhouse_api/upload_file.go)

### std `database/sql` interface

Expand Down
38 changes: 38 additions & 0 deletions clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"math/rand"
"sync"
Expand Down Expand Up @@ -89,6 +90,7 @@ type nativeTransport interface {
prepareBatch(ctx context.Context, release nativeTransportRelease, acquire nativeTransportAcquire, query string, opts driver.PrepareBatchOptions) (driver.Batch, error)
exec(ctx context.Context, query string, args ...any) error
asyncInsert(ctx context.Context, query string, wait bool, args ...any) error
uploadFile(ctx context.Context, reader io.Reader, query string) error
ping(context.Context) error
isBad() bool
connID() int
Expand Down Expand Up @@ -216,6 +218,42 @@ func (ch *clickhouse) AsyncInsert(ctx context.Context, query string, wait bool,
return nil
}

// UploadFile streams a local file directly to ClickHouse over HTTP as the request body.
//
// The file is sent "as-is" without any decompression or recompression on the client side.
// This method is intended for INSERT ... FORMAT <fmt> queries where the payload is already
// prepared and optionally compressed (e.g. TSV.zst).
//
// Compression handling:
// - If contentEncoding is explicitly provided, it is set as HTTP "Content-Encoding".
// - If contentEncoding is empty, the encoding is auto-detected from the file extension
// (e.g. ".zst" → "zstd", ".gz" → "gzip").
// - The driver does NOT attempt to decode, encode, or transform the stream.
//
// Parameters:
// - ctx: request context (cancellation, deadlines).
// - filePath: path to the file to upload; the file is streamed and not buffered in memory.
// - query: ClickHouse INSERT query (typically "INSERT INTO <table> FORMAT <format>").
//
// Limitations:
// - External tables are not supported for file uploads.
// - This method is available only for the HTTP transport.
//
// Typical usage:
// err := conn.UploadFile(ctx, "data.tsv.zstd", "INSERT INTO db.table FORMAT TSV")
//
// On success, the file contents are fully consumed and the request body is discarded.
func (ch *clickhouse) UploadFile(ctx context.Context, reader io.Reader, query string) (err error) {
conn, err := ch.acquire(ctx)
if err != nil {
return err
}
defer ch.release(conn, err)
conn.getLogger().Debug("uploadFile", slog.String("sql", query))
err = conn.uploadFile(ctx, reader, query)
return err
}

func (ch *clickhouse) Ping(ctx context.Context) (err error) {
conn, err := ch.acquire(ctx)
if err != nil {
Expand Down
128 changes: 128 additions & 0 deletions conn_http_upload_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package clickhouse

import (
"context"
"fmt"
"io"
"regexp"
"strings"
)

var contentEncodingExtensions = map[string][]string{
"gzip": {".gz", ".gzip"},
"br": {".br", ".brotli"},
"deflate": {".deflate"},
"xz": {".xz"},
"zstd": {".zst", ".zstd"},
"lz4": {".lz", ".lz4"},
"bz2": {".bz2"},
"snappy": {".snappy"},
}

// uploadFile streams a local file directly to ClickHouse over HTTP as the request body.
//
// The file is sent "as-is" without any decompression or recompression on the client side.
// This method is intended for INSERT ... FORMAT <fmt> queries where the payload is already
// prepared and optionally compressed (e.g. TSV.zst).
//
// Compression handling:
// - If contentEncoding is explicitly provided, it is set as HTTP "Content-Encoding".
// - If contentEncoding is empty, the encoding is auto-detected from the file extension
// (e.g. ".zst" → "zstd", ".gz" → "gzip").
// - The driver does NOT attempt to decode, encode, or transform the stream.
//
// Parameters:
// - ctx: request context (cancellation, deadlines).
// - filePath: path to the file to upload; the file is streamed and not buffered in memory.
// - query: ClickHouse INSERT query (typically "INSERT INTO <table> FORMAT <format>").
//
// Limitations:
// - External tables are not supported for file uploads.
// - This method is available only for the HTTP transport.
//
// Typical usage:
// err := conn.uploadFile(ctx, "data.tsv.zst", "text/tab-separated-values", "zstd", "INSERT INTO db.table FORMAT TSV")
//
// On success, the file contents are fully consumed and the request body is discarded.
func (h *httpConnect) uploadFile(ctx context.Context, reader io.Reader, query string) error {
options := queryOptions(ctx)
options.settings["query"] = query
Comment thread
cl-bvl marked this conversation as resolved.

if len(options.external) > 0 {
return fmt.Errorf("external tables are not supported for file upload")
}
if options.fileContentType == "" {
options.fileContentType = contentTypeFromFormat(parseFormatFromSQL(query))
if options.fileContentType == "" {
return fmt.Errorf("unknown file Content-Type")
}
}

headers := map[string]string{"Content-Type": options.fileContentType}
if options.fileEncoding != "" {
headers["Content-Encoding"] = options.fileEncoding
}

switch h.compression {
case CompressionZSTD, CompressionLZ4:
options.settings["compress"] = "1"
case CompressionGZIP, CompressionDeflate, CompressionBrotli:
// request encoding
headers["Accept-Encoding"] = h.compression.String()
}

req, err := h.createRequest(ctx, h.url.String(), reader, &options, headers)
if err != nil {
return err
}

res, err := h.executeRequest(req)
if err != nil {
return err
}
defer discardAndClose(res.Body)

return nil
}



func parseFormatFromSQL(query string) string {
var re = regexp.MustCompile(`(?i)\bformat\b\s*([A-Za-z0-9_]+)`)
m := re.FindStringSubmatch(query)
if len(m) > 1 {
return m[1]
}
return ""
}

func contentTypeFromFormat(format string) string {
formats := map[string][]string{
"text/tab-separated-values": {
"TabSeparated", "TSV",
"TabSeparatedRaw", "TSVRaw", "Raw",
"TabSeparatedWithNames", "TSVWithNames", "RawWithNames",
"TabSeparatedWithNamesAndTypes", "TSVWithNamesAndTypes", "RawWithNamesAndTypes",
"TabSeparatedRawWithNames", "TSVRawWithNames", "RawWithNames",
"TabSeparatedRawWithNamesAndTypes", "TSVRawWithNamesAndNames", "RawWithNamesAndNames",
},
"text/csv": {"CSV", "CSVWithNames", "CSVWithNamesAndTypes"},
"application/json": {
"JSON", "JSONAsString", "JSONAsObject", "JSONStrings", "JSONColumns", "JSONColumnsWithMetadata", "JSONObjectEachRow",
"JSONEachRow", "PrettyJSONEachRow", "JSONEachRowWithProgress", "JSONStringsEachRow", "JSONStringsEachRowWithProgress",
"JSONCompact", "JSONCompactStrings", "JSONCompactColumns", "JSONCompactEachRow", "JSONCompactEachRowWithNames",
"JSONCompactEachRowWithNamesAndTypes", "JSONCompactEachRowWithProgress", "JSONCompactStringsEachRow",
"JSONCompactStringsEachRowWithNames", "JSONCompactStringsEachRowWithNamesAndTypes", "JSONCompactStringsEachRowWithProgress",
},
}

for contentType, fmts := range formats {
for _, fmt := range fmts {
if strings.ToLower(fmt) == strings.ToLower(format) {
return contentType
}
}
}

return "application/octet-stream"
}
8 changes: 7 additions & 1 deletion conn_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package clickhouse

import (
"context"
"io"
"log/slog"
"sync"
"testing"
"time"

"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)

func TestConnPool_Cap(t *testing.T) {
Expand Down Expand Up @@ -498,6 +500,10 @@ func (m *mockTransport) exec(ctx context.Context, query string, args ...any) err
return nil
}

func (m *mockTransport) uploadFile(ctx context.Context, reader io.Reader, query string) error {
return nil
}

func (m *mockTransport) asyncInsert(ctx context.Context, query string, wait bool, args ...any) error {
return nil
}
Expand Down
11 changes: 11 additions & 0 deletions conn_upload_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package clickhouse

import (
"context"
"fmt"
"io"
)

func (c *connect) uploadFile(ctx context.Context, reader io.Reader, query string) error {
return fmt.Errorf("UploadFile is not implemented for Native connector")
}
56 changes: 55 additions & 1 deletion context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package clickhouse

import (
"context"
"fmt"
"maps"
"path/filepath"
"slices"
"strings"
"time"

"github.com/ClickHouse/clickhouse-go/v2/ext"
"go.opentelemetry.io/otel/trace"

"github.com/ClickHouse/clickhouse-go/v2/ext"
)

var _contextOptionKey = &QueryOptions{
Expand Down Expand Up @@ -57,6 +61,8 @@ type (
userLocation *time.Location
columnNamesAndTypes []ColumnNameAndType
clientInfo ClientInfo
fileContentType string
fileEncoding string
}
)

Expand Down Expand Up @@ -189,6 +195,52 @@ func WithUserLocation(location *time.Location) QueryOption {
}
}

// WithFileContentType set Content-Type for upload HTTP requests: (e.g. "text/tab-separated-values")
func WithFileContentType(ct string) QueryOption {
return func(o *QueryOptions) error {
o.fileContentType = strings.ToLower(ct)
return nil
}
}

// WithFileEncoding set Content-Encoding for upload HTTP requests (e.g. "zstd", "gzip")
func WithEncoding(encoding string) QueryOption {
return func(o *QueryOptions) error {
enc := strings.ToLower(encoding)
if _, ok := contentEncodingExtensions[enc]; !ok {
return fmt.Errorf("unsupported file content encoding: %s", encoding)
}
o.fileEncoding = enc
return nil
}
}

func WithFileEncoding(filename string) QueryOption {
return func(o *QueryOptions) error {
extMapping := map[string][]string{
"gzip": {".gz", ".gzip"},
"br": {".br", ".brotli"},
"deflate": {".deflate"},
"xz": {".xz"},
"zstd": {".zst", ".zstd"},
"lz4": {".lz", ".lz4"},
"bz2": {".bz2"},
"snappy": {".snappy"},
}

for encoding, extentions := range extMapping {
for _, ext := range extentions {
if strings.HasSuffix(filename, ext) {
o.fileEncoding = encoding
return nil
}
}
}

return fmt.Errorf("unsupported content encoding ext: %s", filepath.Base(filename))
}
}

func ignoreExternalTables() QueryOption {
return func(o *QueryOptions) error {
o.external = nil
Expand Down Expand Up @@ -315,6 +367,8 @@ func (q *QueryOptions) clone() QueryOptions {
blockBufferSize: q.blockBufferSize,
userLocation: q.userLocation,
columnNamesAndTypes: nil,
fileContentType: q.fileContentType,
fileEncoding: q.fileEncoding,
}

if q.settings != nil {
Expand Down
Loading