diff --git a/README.md b/README.md index f2abf41020..9f12ad197f 100644 --- a/README.md +++ b/README.md @@ -336,6 +336,31 @@ We have following examples to show Async Insert in action. Available options: - [WithReleaseConnection](examples/clickhouse_api/batch_release_connection.go) - after PrepareBatch connection will be returned to the pool. It can help you make a long-lived batch. +## Upload File + +The `UploadFile` method allows uploading a file directly to ClickHouse without re-packing or parsing its contents. +If the file is already compressed using a ClickHouse-supported compression algorithm, it will be sent as-is. + +This upload method significantly reduces client-side resource usage and is designed for data processing pipelines +where a pre-generated file is received and must be ingested into ClickHouse with minimal overhead. + +**This method is supported only by the HTTP connector**. +Attempts to use it with the native protocol will return an error. + + +In most cases, the file encoding can be set automatically from the file name (`clickhouse.WithFileEncoding()`) +and the Content-Type based on the INSERT FORMAT from the query: +``` +INSERT INTO FORMAT +``` +or the extended form: +``` +INSERT INTO
(col1, col2) SETTINGS = FORMAT +``` + +If needed, `Content-Type` and `Content-Encoding` can be explicitly overridden using +`WithFileContentType()` and `WithFileEncoding()`. + ## Benchmark | [V2 (READ) std](benchmark/v2/read/main.go) | [V2 (READ) clickhouse API](benchmark/v2/read-native/main.go) | @@ -369,6 +394,7 @@ go get -u github.com/ClickHouse/clickhouse-go/v2 * [query parameters](examples/clickhouse_api/query_parameters.go) * [bind params](examples/clickhouse_api/bind.go) (deprecated in favour of native query parameters) * [client info](examples/clickhouse_api/client_info.go) +* [upload file](examples/clickhouse_api/upload_file.go) ### std `database/sql` interface diff --git a/clickhouse.go b/clickhouse.go index b89aaa3a0d..ab8facedf1 100644 --- a/clickhouse.go +++ b/clickhouse.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "log/slog" "math/rand" "sync" @@ -89,6 +90,7 @@ type nativeTransport interface { prepareBatch(ctx context.Context, release nativeTransportRelease, acquire nativeTransportAcquire, query string, opts driver.PrepareBatchOptions) (driver.Batch, error) exec(ctx context.Context, query string, args ...any) error asyncInsert(ctx context.Context, query string, wait bool, args ...any) error + uploadFile(ctx context.Context, reader io.Reader, query string) error ping(context.Context) error isBad() bool connID() int @@ -216,6 +218,42 @@ func (ch *clickhouse) AsyncInsert(ctx context.Context, query string, wait bool, return nil } +// UploadFile streams a local file directly to ClickHouse over HTTP as the request body. +// +// The file is sent "as-is" without any decompression or recompression on the client side. +// This method is intended for INSERT ... FORMAT queries where the payload is already +// prepared and optionally compressed (e.g. TSV.zst). +// +// Compression handling: +// - If contentEncoding is explicitly provided, it is set as HTTP "Content-Encoding". +// - If contentEncoding is empty, the encoding is auto-detected from the file extension +// (e.g. ".zst" → "zstd", ".gz" → "gzip"). +// - The driver does NOT attempt to decode, encode, or transform the stream. +// +// Parameters: +// - ctx: request context (cancellation, deadlines). +// - filePath: path to the file to upload; the file is streamed and not buffered in memory. +// - query: ClickHouse INSERT query (typically "INSERT INTO
FORMAT "). +// +// Limitations: +// - External tables are not supported for file uploads. +// - This method is available only for the HTTP transport. +// +// Typical usage: +// err := conn.UploadFile(ctx, "data.tsv.zstd", "INSERT INTO db.table FORMAT TSV") +// +// On success, the file contents are fully consumed and the request body is discarded. +func (ch *clickhouse) UploadFile(ctx context.Context, reader io.Reader, query string) (err error) { + conn, err := ch.acquire(ctx) + if err != nil { + return err + } + defer ch.release(conn, err) + conn.getLogger().Debug("uploadFile", slog.String("sql", query)) + err = conn.uploadFile(ctx, reader, query) + return err +} + func (ch *clickhouse) Ping(ctx context.Context) (err error) { conn, err := ch.acquire(ctx) if err != nil { diff --git a/conn_http_upload_file.go b/conn_http_upload_file.go new file mode 100644 index 0000000000..c14679d759 --- /dev/null +++ b/conn_http_upload_file.go @@ -0,0 +1,128 @@ +package clickhouse + +import ( + "context" + "fmt" + "io" + "regexp" + "strings" +) + +var contentEncodingExtensions = map[string][]string{ + "gzip": {".gz", ".gzip"}, + "br": {".br", ".brotli"}, + "deflate": {".deflate"}, + "xz": {".xz"}, + "zstd": {".zst", ".zstd"}, + "lz4": {".lz", ".lz4"}, + "bz2": {".bz2"}, + "snappy": {".snappy"}, +} + +// uploadFile streams a local file directly to ClickHouse over HTTP as the request body. +// +// The file is sent "as-is" without any decompression or recompression on the client side. +// This method is intended for INSERT ... FORMAT queries where the payload is already +// prepared and optionally compressed (e.g. TSV.zst). +// +// Compression handling: +// - If contentEncoding is explicitly provided, it is set as HTTP "Content-Encoding". +// - If contentEncoding is empty, the encoding is auto-detected from the file extension +// (e.g. ".zst" → "zstd", ".gz" → "gzip"). +// - The driver does NOT attempt to decode, encode, or transform the stream. +// +// Parameters: +// - ctx: request context (cancellation, deadlines). +// - filePath: path to the file to upload; the file is streamed and not buffered in memory. +// - query: ClickHouse INSERT query (typically "INSERT INTO
FORMAT "). +// +// Limitations: +// - External tables are not supported for file uploads. +// - This method is available only for the HTTP transport. +// +// Typical usage: +// err := conn.uploadFile(ctx, "data.tsv.zst", "text/tab-separated-values", "zstd", "INSERT INTO db.table FORMAT TSV") +// +// On success, the file contents are fully consumed and the request body is discarded. +func (h *httpConnect) uploadFile(ctx context.Context, reader io.Reader, query string) error { + options := queryOptions(ctx) + options.settings["query"] = query + + if len(options.external) > 0 { + return fmt.Errorf("external tables are not supported for file upload") + } + if options.fileContentType == "" { + options.fileContentType = contentTypeFromFormat(parseFormatFromSQL(query)) + if options.fileContentType == "" { + return fmt.Errorf("unknown file Content-Type") + } + } + + headers := map[string]string{"Content-Type": options.fileContentType} + if options.fileEncoding != "" { + headers["Content-Encoding"] = options.fileEncoding + } + + switch h.compression { + case CompressionZSTD, CompressionLZ4: + options.settings["compress"] = "1" + case CompressionGZIP, CompressionDeflate, CompressionBrotli: + // request encoding + headers["Accept-Encoding"] = h.compression.String() + } + + req, err := h.createRequest(ctx, h.url.String(), reader, &options, headers) + if err != nil { + return err + } + + res, err := h.executeRequest(req) + if err != nil { + return err + } + defer discardAndClose(res.Body) + + return nil +} + + + +func parseFormatFromSQL(query string) string { + var re = regexp.MustCompile(`(?i)\bformat\b\s*([A-Za-z0-9_]+)`) + m := re.FindStringSubmatch(query) + if len(m) > 1 { + return m[1] + } + return "" +} + +func contentTypeFromFormat(format string) string { + formats := map[string][]string{ + "text/tab-separated-values": { + "TabSeparated", "TSV", + "TabSeparatedRaw", "TSVRaw", "Raw", + "TabSeparatedWithNames", "TSVWithNames", "RawWithNames", + "TabSeparatedWithNamesAndTypes", "TSVWithNamesAndTypes", "RawWithNamesAndTypes", + "TabSeparatedRawWithNames", "TSVRawWithNames", "RawWithNames", + "TabSeparatedRawWithNamesAndTypes", "TSVRawWithNamesAndNames", "RawWithNamesAndNames", + }, + "text/csv": {"CSV", "CSVWithNames", "CSVWithNamesAndTypes"}, + "application/json": { + "JSON", "JSONAsString", "JSONAsObject", "JSONStrings", "JSONColumns", "JSONColumnsWithMetadata", "JSONObjectEachRow", + "JSONEachRow", "PrettyJSONEachRow", "JSONEachRowWithProgress", "JSONStringsEachRow", "JSONStringsEachRowWithProgress", + "JSONCompact", "JSONCompactStrings", "JSONCompactColumns", "JSONCompactEachRow", "JSONCompactEachRowWithNames", + "JSONCompactEachRowWithNamesAndTypes", "JSONCompactEachRowWithProgress", "JSONCompactStringsEachRow", + "JSONCompactStringsEachRowWithNames", "JSONCompactStringsEachRowWithNamesAndTypes", "JSONCompactStringsEachRowWithProgress", + }, + } + + for contentType, fmts := range formats { + for _, fmt := range fmts { + if strings.ToLower(fmt) == strings.ToLower(format) { + return contentType + } + } + } + + return "application/octet-stream" +} diff --git a/conn_pool_test.go b/conn_pool_test.go index ae7957575d..f8da7b5eb0 100644 --- a/conn_pool_test.go +++ b/conn_pool_test.go @@ -2,14 +2,16 @@ package clickhouse import ( "context" + "io" "log/slog" "sync" "testing" "time" - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" ) func TestConnPool_Cap(t *testing.T) { @@ -498,6 +500,10 @@ func (m *mockTransport) exec(ctx context.Context, query string, args ...any) err return nil } +func (m *mockTransport) uploadFile(ctx context.Context, reader io.Reader, query string) error { + return nil +} + func (m *mockTransport) asyncInsert(ctx context.Context, query string, wait bool, args ...any) error { return nil } diff --git a/conn_upload_file.go b/conn_upload_file.go new file mode 100644 index 0000000000..279b13d637 --- /dev/null +++ b/conn_upload_file.go @@ -0,0 +1,11 @@ +package clickhouse + +import ( + "context" + "fmt" + "io" +) + +func (c *connect) uploadFile(ctx context.Context, reader io.Reader, query string) error { + return fmt.Errorf("UploadFile is not implemented for Native connector") +} diff --git a/context.go b/context.go index 5277523d96..91197fb240 100644 --- a/context.go +++ b/context.go @@ -2,12 +2,16 @@ package clickhouse import ( "context" + "fmt" "maps" + "path/filepath" "slices" + "strings" "time" - "github.com/ClickHouse/clickhouse-go/v2/ext" "go.opentelemetry.io/otel/trace" + + "github.com/ClickHouse/clickhouse-go/v2/ext" ) var _contextOptionKey = &QueryOptions{ @@ -57,6 +61,8 @@ type ( userLocation *time.Location columnNamesAndTypes []ColumnNameAndType clientInfo ClientInfo + fileContentType string + fileEncoding string } ) @@ -189,6 +195,52 @@ func WithUserLocation(location *time.Location) QueryOption { } } +// WithFileContentType set Content-Type for upload HTTP requests: (e.g. "text/tab-separated-values") +func WithFileContentType(ct string) QueryOption { + return func(o *QueryOptions) error { + o.fileContentType = strings.ToLower(ct) + return nil + } +} + +// WithFileEncoding set Content-Encoding for upload HTTP requests (e.g. "zstd", "gzip") +func WithEncoding(encoding string) QueryOption { + return func(o *QueryOptions) error { + enc := strings.ToLower(encoding) + if _, ok := contentEncodingExtensions[enc]; !ok { + return fmt.Errorf("unsupported file content encoding: %s", encoding) + } + o.fileEncoding = enc + return nil + } +} + +func WithFileEncoding(filename string) QueryOption { + return func(o *QueryOptions) error { + extMapping := map[string][]string{ + "gzip": {".gz", ".gzip"}, + "br": {".br", ".brotli"}, + "deflate": {".deflate"}, + "xz": {".xz"}, + "zstd": {".zst", ".zstd"}, + "lz4": {".lz", ".lz4"}, + "bz2": {".bz2"}, + "snappy": {".snappy"}, + } + + for encoding, extentions := range extMapping { + for _, ext := range extentions { + if strings.HasSuffix(filename, ext) { + o.fileEncoding = encoding + return nil + } + } + } + + return fmt.Errorf("unsupported content encoding ext: %s", filepath.Base(filename)) + } +} + func ignoreExternalTables() QueryOption { return func(o *QueryOptions) error { o.external = nil @@ -315,6 +367,8 @@ func (q *QueryOptions) clone() QueryOptions { blockBufferSize: q.blockBufferSize, userLocation: q.userLocation, columnNamesAndTypes: nil, + fileContentType: q.fileContentType, + fileEncoding: q.fileEncoding, } if q.settings != nil { diff --git a/examples/clickhouse_api/upload_file.go b/examples/clickhouse_api/upload_file.go new file mode 100644 index 0000000000..629105bdd6 --- /dev/null +++ b/examples/clickhouse_api/upload_file.go @@ -0,0 +1,85 @@ +package clickhouse_api + +import ( + "bytes" + "context" + "os" + + "github.com/ClickHouse/clickhouse-go/v2" +) + +func UploadFile() error { + conn, err := GetHTTPConnection("UploadFile", nil, nil, nil) + if err != nil { + return err + } + ctx := context.Background() + defer func() { + conn.Exec(ctx, "DROP TABLE test_upload_file") + }() + conn.Exec(ctx, `DROP TABLE IF EXISTS test_upload_file`) + if err = conn.Exec(ctx, ` + CREATE TABLE test_upload_file ( + File LowCardinality(FixedString(32)) + , ID UInt32 + , Data Nullable(FixedString(32)) + ) Engine MergeTree() ORDER BY (File, ID) + `); err != nil { + return err + } + + filePath := "./../../tests/testdata/upload_file.tsv.zstd" + f, err := os.Open(filePath) + if err != nil { + return err + } + defer f.Close() + + ctx = clickhouse.Context(ctx, clickhouse.WithFileEncoding(filePath)) + err = conn.UploadFile(ctx, f, "INSERT INTO test_upload_file FORMAT TSV") + if err != nil { + return err + } + + return nil +} + +func UploadFileReader() error { + conn, err := GetHTTPConnection("UploadFileReader", nil, nil, nil) + if err != nil { + return err + } + ctx := context.Background() + defer func() { + conn.Exec(ctx, "DROP TABLE test_upload_file") + }() + conn.Exec(ctx, `DROP TABLE IF EXISTS test_upload_file`) + if err = conn.Exec(ctx, ` + CREATE TABLE test_upload_file ( + File LowCardinality(FixedString(32)) + , ID UInt32 + , Data Nullable(FixedString(32)) + ) Engine MergeTree() ORDER BY (File, ID) + `); err != nil { + return err + } + + const data = `{"File": "upload_file.json", "ID": 1, "Data": null} +{"File": "upload_file.json", "ID": 2, "Data": "ASD"} +{"File": "upload_file.json", "ID": 3, "Data": "ASD"} +{"File": "upload_file.json", "ID": 4, "Data": "QWE"} +{"File": "upload_file.json", "ID": 5, "Data": "Foo"} +` + buf := bytes.NewBufferString(data) + + // The data is uncompressed, so the Encoding parameter value is empty. It's empty by default; we've set it explicitly for clarity only. + // Content-Type is also set automatically based on the request. However, we can set it manually if necessary. + ctx = clickhouse.Context(ctx, clickhouse.WithEncoding(""), clickhouse.WithFileContentType("application/json")) + + err = conn.UploadFile(ctx, buf, "INSERT INTO test_upload_file FORMAT JSONEachRow") + if err != nil { + return err + } + + return nil +} diff --git a/lib/driver/driver.go b/lib/driver/driver.go index 8ebf639fc7..f403e1ccc0 100644 --- a/lib/driver/driver.go +++ b/lib/driver/driver.go @@ -2,6 +2,7 @@ package driver import ( "context" + "io" "reflect" "time" @@ -40,7 +41,7 @@ type ( QueryRow(ctx context.Context, query string, args ...any) Row PrepareBatch(ctx context.Context, query string, opts ...PrepareBatchOption) (Batch, error) Exec(ctx context.Context, query string, args ...any) error - + UploadFile(ctx context.Context, reader io.Reader, query string) error // Deprecated: use context aware `WithAsync()` for any async operations AsyncInsert(ctx context.Context, query string, wait bool, args ...any) error Ping(context.Context) error diff --git a/tests/testdata/upload_file.csv.gz b/tests/testdata/upload_file.csv.gz new file mode 100644 index 0000000000..cda4e4046a Binary files /dev/null and b/tests/testdata/upload_file.csv.gz differ diff --git a/tests/testdata/upload_file.json b/tests/testdata/upload_file.json new file mode 100644 index 0000000000..fa7fd51f43 --- /dev/null +++ b/tests/testdata/upload_file.json @@ -0,0 +1,5 @@ +{"File": "upload_file.json", "ID": 1, "Data": null} +{"File": "upload_file.json", "ID": 2, "Data": "ASD"} +{"File": "upload_file.json", "ID": 3, "Data": "ASD"} +{"File": "upload_file.json", "ID": 4, "Data": "QWE"} +{"File": "upload_file.json", "ID": 5, "Data": "Foo"} \ No newline at end of file diff --git a/tests/testdata/upload_file.tsv.zstd b/tests/testdata/upload_file.tsv.zstd new file mode 100644 index 0000000000..c4d6382346 Binary files /dev/null and b/tests/testdata/upload_file.tsv.zstd differ diff --git a/tests/upload_file_test.go b/tests/upload_file_test.go new file mode 100644 index 0000000000..c35563b86a --- /dev/null +++ b/tests/upload_file_test.go @@ -0,0 +1,92 @@ +package tests + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ClickHouse/clickhouse-go/v2" +) + +func TestUploadFile(t *testing.T) { + + TestProtocols(t, func(t *testing.T, protocol clickhouse.Protocol) { + if protocol == clickhouse.Native { + return + } + conn, err := GetNativeConnection(t, protocol, nil, nil, &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }) + require.NoError(t, err) + + ctx := context.Background() + const ddl = ` + CREATE TABLE test_upload_file ( + File LowCardinality(FixedString(32)) + , ID UInt32 + , Data Nullable(FixedString(32)) + + ) Engine MergeTree() ORDER BY (File, ID) + ` + + require.NoError(t, conn.Exec(ctx, ddl)) + defer func() { + conn.Exec(ctx, "DROP TABLE IF EXISTS test_upload_file") + }() + + tests := []struct { + Filename string + Format string + ContentType string + Encoding string + Count uint64 + }{ + { + Filename: "./testdata/upload_file.tsv.zstd", + Format: "tsV", + Count: 3, + }, + { + Filename: "./testdata/upload_file.csv.gz", + Format: "CSV", + ContentType: "text/plain; charset=utf-8", + Encoding: "gzip", + Count: 4, + }, + { + Filename: "./testdata/upload_file.json", + Format: "JSONEachRow", + ContentType: "application/json; charset=utf-8", + Count: 5, + }, + } + for _, test := range tests { + opts := make([]clickhouse.QueryOption, 0, 4) + if test.ContentType != "" { + opts = append(opts, clickhouse.WithFileContentType(test.ContentType)) + } + if test.Encoding != "" { + opts = append(opts, clickhouse.WithEncoding(test.Encoding)) + } else { + opts = append(opts, clickhouse.WithFileEncoding(test.Filename)) + } + f, err := os.Open(test.Filename) + require.NoError(t, err, test) + defer f.Close() + + cc := clickhouse.Context(ctx, opts...) + + err = conn.UploadFile(cc, f, fmt.Sprintf("INSERT INTO test_upload_file FORMAT %s", test.Format)) + require.NoError(t, err, test) + + var count uint64 + require.NoError(t, conn.QueryRow(cc, "SELECT Count(*) FROM test_upload_file WHERE File = ?", filepath.Base(test.Filename)).Scan(&count)) + assert.Equal(t, test.Count, count, "Wrong lines count for file", test.Filename) + } + }) +}