diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index d367a44ab7..654020d791 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -199,7 +199,7 @@ func ProcessFacetsRequest(ctx context.Context, w http.ResponseWriter, r *http.Re h := w.Header() h.Set("Content-Type", "application/json") - ca.writeResponseHeaders(h, startTime) + ca.writeResponseHeaders(h, qctx.QueryStats, startTime) // Write response WriteFacetsResponse(w, m) @@ -310,7 +310,7 @@ func ProcessHitsRequest(ctx context.Context, w http.ResponseWriter, r *http.Requ h := w.Header() h.Set("Content-Type", "application/json") - ca.writeResponseHeaders(h, startTime) + ca.writeResponseHeaders(h, qctx.QueryStats, startTime) // Write response WriteHitsSeries(w, m) @@ -450,7 +450,7 @@ func ProcessFieldNamesRequest(ctx context.Context, w http.ResponseWriter, r *htt h := w.Header() h.Set("Content-Type", "application/json") - ca.writeResponseHeaders(h, startTime) + ca.writeResponseHeaders(h, qctx.QueryStats, startTime) // Write results WriteValuesWithHitsJSON(w, fieldNames) @@ -498,7 +498,7 @@ func ProcessFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *ht h := w.Header() h.Set("Content-Type", "application/json") - ca.writeResponseHeaders(h, startTime) + ca.writeResponseHeaders(h, qctx.QueryStats, startTime) // Write results WriteValuesWithHitsJSON(w, values) @@ -532,7 +532,7 @@ func ProcessStreamFieldNamesRequest(ctx context.Context, w http.ResponseWriter, h := w.Header() h.Set("Content-Type", "application/json") - ca.writeResponseHeaders(h, startTime) + ca.writeResponseHeaders(h, qctx.QueryStats, startTime) // Write results WriteValuesWithHitsJSON(w, names) @@ -580,7 +580,7 @@ func ProcessStreamFieldValuesRequest(ctx context.Context, w http.ResponseWriter, h := w.Header() h.Set("Content-Type", "application/json") - ca.writeResponseHeaders(h, startTime) + ca.writeResponseHeaders(h, qctx.QueryStats, startTime) // Write results WriteValuesWithHitsJSON(w, values) @@ -618,7 +618,7 @@ func ProcessStreamIDsRequest(ctx context.Context, w http.ResponseWriter, r *http h := w.Header() h.Set("Content-Type", "application/json") - ca.writeResponseHeaders(h, startTime) + ca.writeResponseHeaders(h, qctx.QueryStats, startTime) // Write results WriteValuesWithHitsJSON(w, streamIDs) @@ -656,7 +656,7 @@ func ProcessStreamsRequest(ctx context.Context, w http.ResponseWriter, r *http.R h := w.Header() h.Set("Content-Type", "application/json") - ca.writeResponseHeaders(h, startTime) + ca.writeResponseHeaders(h, qctx.QueryStats, startTime) // Write results WriteValuesWithHitsJSON(w, streams) @@ -1030,7 +1030,7 @@ func ProcessStatsQueryRangeRequest(ctx context.Context, w http.ResponseWriter, r h := w.Header() h.Set("Content-Type", "application/json") - ca.writeResponseHeaders(h, startTime) + ca.writeResponseHeaders(h, qctx.QueryStats, startTime) // Write response WriteStatsQueryRangeResponse(w, rows) @@ -1153,7 +1153,7 @@ func ProcessStatsQueryRequest(ctx context.Context, w http.ResponseWriter, r *htt h := w.Header() h.Set("Content-Type", "application/json") - ca.writeResponseHeaders(h, startTime) + ca.writeResponseHeaders(h, qctx.QueryStats, startTime) // Write response WriteStatsQueryResponse(w, rows) @@ -1245,6 +1245,9 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req } }() + qctx := ca.newQueryContext(ctx) + defer ca.updatePerQueryStatsMetrics() + startTime := time.Now() writeResponseHeadersOnce := sync.OnceFunc(func() { // Write response headers @@ -1255,7 +1258,7 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req } else { h.Set("Content-Type", "application/stream+json") } - ca.writeResponseHeaders(h, startTime) + ca.writeResponseHeaders(h, qctx.QueryStats, startTime) if format == "csv" { _, _ = sw.Write(csvHeader) @@ -1293,9 +1296,6 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req } } - qctx := ca.newQueryContext(ctx) - defer ca.updatePerQueryStatsMetrics() - // Execute the query if err := vlstorage.RunQuery(qctx, writeBlock); err != nil { httpserver.Errorf(w, r, "cannot execute query [%s]: %s", ca.q, err) @@ -1808,9 +1808,9 @@ func getStringSliceFromRequest(r *http.Request, argName string) ([]string, error return a, nil } -func (ca *commonArgs) writeResponseHeaders(h http.Header, startTime time.Time) { +func (ca *commonArgs) writeResponseHeaders(h http.Header, qs *logstorage.QueryStats, startTime time.Time) { // Write request duration - accessControlExposeHeaders := []string{"VL-Request-Duration-Seconds"} + accessControlExposeHeaders := []string{"VL-Request-Duration-Seconds", "VL-Partial-Response"} h.Set("VL-Request-Duration-Seconds", fmt.Sprintf("%.3f", time.Since(startTime).Seconds())) if len(ca.tenantIDs) == 1 { @@ -1821,12 +1821,32 @@ func (ca *commonArgs) writeResponseHeaders(h http.Header, startTime time.Time) { h.Set("ProjectID", fmt.Sprintf("%d", tenantID.ProjectID)) } + qc := qs.QueryCompleteness + h.Set("VL-Partial-Response", getPartialResponseHeaderValue(qc)) + for i, v := range accessControlExposeHeaders { accessControlExposeHeaders[i] = http.CanonicalHeaderKey(v) } h.Set("Access-Control-Expose-Headers", strings.Join(accessControlExposeHeaders, ", ")) } +func getPartialResponseHeaderValue(qc logstorage.QueryCompleteness) string { + switch qc { + case logstorage.QueryCompletenessNotSet: + // qs is not set, means the response is flushed without waiting full response. + // Treat this as "unknown". + return "unknown" + case logstorage.QueryCompletenessPartial: + return "true" + case logstorage.QueryCompletenessUnknown: + return "unknown" + case logstorage.QueryCompletenessComplete: + return "false" + default: + panic(fmt.Errorf("BUG: unexpected QueryCompleteness value %d", qc)) + } +} + func parseDuration(r *http.Request, argName, defaultValue string) (int64, error) { s := r.FormValue(argName) if s == "" { diff --git a/app/vlstorage/netselect/netselect.go b/app/vlstorage/netselect/netselect.go index 924c07411c..9ba65f4bd9 100644 --- a/app/vlstorage/netselect/netselect.go +++ b/app/vlstorage/netselect/netselect.go @@ -129,19 +129,16 @@ func newStorageNode(s *Storage, addr string, ac *promauth.Config, isTLS bool) *s return sn } -func (sn *storageNode) runQuery(qctx *logstorage.QueryContext, processBlock func(db *logstorage.DataBlock)) error { - args := sn.getCommonArgs(QueryProtocolVersion, qctx) - - qsLocal := &logstorage.QueryStats{} - defer qctx.QueryStats.UpdateAtomic(qsLocal) - +func (sn *storageNode) runQuery(ctx context.Context, args url.Values, processBlock func(db *logstorage.DataBlock)) (*logstorage.QueryStats, error) { path := "/internal/select/query" - responseBody, reqURL, err := sn.getResponseBodyForPathAndArgs(qctx.Context, path, args) + responseBody, reqURL, err := sn.getResponseBodyForPathAndArgs(ctx, path, args) if err != nil { - return err + return nil, err } defer responseBody.Close() + qs := &logstorage.QueryStats{} + // read the response var dataLenBuf [8]byte var buf []byte @@ -151,18 +148,18 @@ func (sn *storageNode) runQuery(qctx *logstorage.QueryContext, processBlock func if _, err := io.ReadFull(responseBody, dataLenBuf[:]); err != nil { if errors.Is(err, io.EOF) { // The end of response stream - return nil + return qs, nil } - return fmt.Errorf("cannot read block size from %q: %w", reqURL, err) + return nil, fmt.Errorf("cannot read block size from %q: %w", reqURL, err) } blockLen := encoding.UnmarshalUint64(dataLenBuf[:]) if blockLen > math.MaxInt { - return fmt.Errorf("too big data block read from %q: %d bytes; mustn't exceed %v bytes", reqURL, blockLen, math.MaxInt) + return nil, fmt.Errorf("too big data block read from %q: %d bytes; mustn't exceed %v bytes", reqURL, blockLen, math.MaxInt) } buf = slicesutil.SetLength(buf, int(blockLen)) if _, err := io.ReadFull(responseBody, buf); err != nil { - return fmt.Errorf("cannot read block with size of %d bytes from %q: %w", blockLen, reqURL, err) + return nil, fmt.Errorf("cannot read block with size of %d bytes from %q: %w", blockLen, reqURL, err) } src := buf @@ -171,7 +168,7 @@ func (sn *storageNode) runQuery(qctx *logstorage.QueryContext, processBlock func var err error buf, err = encoding.DecompressZSTD(buf, buf) if err != nil { - return fmt.Errorf("cannot decompress data block: %w", err) + return nil, fmt.Errorf("cannot decompress data block: %w", err) } src = buf[bufLen:] } @@ -181,9 +178,9 @@ func (sn *storageNode) runQuery(qctx *logstorage.QueryContext, processBlock func src = src[1:] if isQueryStatsBlock { - tail, err := unmarshalQueryStats(qsLocal, src) + tail, err := unmarshalQueryStats(qs, src) if err != nil { - return fmt.Errorf("cannot unmarshal query stats received from %q: %w", reqURL, err) + return nil, fmt.Errorf("cannot unmarshal query stats received from %q: %w", reqURL, err) } src = tail continue @@ -191,7 +188,7 @@ func (sn *storageNode) runQuery(qctx *logstorage.QueryContext, processBlock func tail, vb, err := db.UnmarshalInplace(src, valuesBuf[:0]) if err != nil { - return fmt.Errorf("cannot unmarshal data block received from %q: %w", reqURL, err) + return nil, fmt.Errorf("cannot unmarshal data block received from %q: %w", reqURL, err) } valuesBuf = vb src = tail @@ -405,23 +402,37 @@ func (s *Storage) runQuery(stopCh <-chan struct{}, qctx *logstorage.QueryContext ctxWithCancel, cancel := contextutil.NewStopChanContext(stopCh) defer cancel() - qctxLocal := qctx.WithContext(ctxWithCancel) - errs := make([]error, len(s.sns)) + queryStats := make([]*logstorage.QueryStats, len(s.sns)) var wg sync.WaitGroup for nodeIdx := range s.sns { wg.Go(func() { sn := s.sns[nodeIdx] - err := sn.runQuery(qctxLocal, func(db *logstorage.DataBlock) { + args := sn.getCommonArgs(QueryProtocolVersion, qctx) + qs, err := sn.runQuery(ctxWithCancel, args, func(db *logstorage.DataBlock) { writeBlock(uint(nodeIdx), db) }) errs[nodeIdx] = sn.handleError(ctxWithCancel, cancel, err, qctx.AllowPartialResponse) + queryStats[nodeIdx] = qs }) } wg.Wait() - return getFirstError(errs, qctx.AllowPartialResponse) + isPartial, err := getPartialResponseError(errs, qctx.AllowPartialResponse) + if err != nil { + return err + } + qctx.QueryStats.MergeQueryCompleteness(!isPartial) + + for _, qs := range queryStats { + if qs == nil { + continue + } + qctx.QueryStats.UpdateAtomic(qs) + } + + return nil } // GetFieldNames executes qctx and returns field names seen in results. @@ -512,7 +523,7 @@ func (s *Storage) DeleteRunTask(ctx context.Context, taskID string, timestamp in } wg.Wait() - return getFirstError(errs, allowPartialResponse) + return getFirstError(errs) } // DeleteStopTask stops the delete task with the given taskID. @@ -538,7 +549,7 @@ func (s *Storage) DeleteStopTask(ctx context.Context, taskID string) error { } wg.Wait() - return getFirstError(errs, allowPartialResponse) + return getFirstError(errs) } // DeleteActiveTasks returns the list of active delete tasks started via DeleteRunTask @@ -564,7 +575,7 @@ func (s *Storage) DeleteActiveTasks(ctx context.Context) ([]*logstorage.DeleteTa } wg.Wait() - if err := getFirstError(errs, allowPartialResponse); err != nil { + if err := getFirstError(errs); err != nil { return nil, err } @@ -619,7 +630,7 @@ func (s *Storage) getTenantIDs(ctx context.Context, start, end int64) ([]logstor } wg.Wait() - if err := getFirstError(errs, allowPartialResponse); err != nil { + if err := getFirstError(errs); err != nil { return nil, err } @@ -659,9 +670,11 @@ func (s *Storage) getValuesWithHits(qctx *logstorage.QueryContext, limit uint64, } wg.Wait() - if err := getFirstError(errs, qctx.AllowPartialResponse); err != nil { + isPartial, err := getPartialResponseError(errs, qctx.AllowPartialResponse) + if err != nil { return nil, err } + qctx.QueryStats.MergeQueryCompleteness(!isPartial) vhs := logstorage.MergeValuesWithHits(results, limit, resetHitsOnLimitExceeded) @@ -760,36 +773,57 @@ func (sn *storageNode) handleError(ctx context.Context, cancel func(), err error return err } -func getFirstError(errs []error, allowPartialResponse bool) error { +// getPartialResponseError determines whether the query result is partial or fully failed based on allowPartialResponse. +// If allowPartialResponse is false, the first non-nil error is returned. +// If allowPartialResponse is true: +// - returns (false, err) if a node returned a non-availability error. +// - returns (false, err) if all nodes are unavailable. +// - returns (true, nil) if at least one node succeeded. +func getPartialResponseError(errs []error, allowPartialResponse bool) (bool, error) { if len(errs) == 0 { logger.Panicf("BUG: len(errs) must be bigger than 0") } if !allowPartialResponse { - for _, err := range errs { - if err != nil { - return err - } + return false, getFirstError(errs) + } + + noErrors := true + for _, err := range errs { + if err != nil { + noErrors = false + break } - return nil + } + if noErrors { + // Not a partial response. + return false, nil } - // allowPartialResponse == true. Return the error only if all the backends are unavailable - // or if some of the backends are improperly configured. for _, err := range errs { if err == nil { // At least a single vlstorage returned full response. - return nil + return true, nil } if !isUnavailableBackendError(err) { // Return the first error, which isn't related to the backend unavailability, to the client, // since this error may point to configuration issues, which must be fixed ASAP. // Hiding this error would complicate troubleshooting of improperly configured system. - return fmt.Errorf("the vlstorage node is available, but it returns an error, which may point to configuration issues: %w", err) + return false, fmt.Errorf("the vlstorage node is available, but it returns an error, which may point to configuration issues: %w", err) } } - return fmt.Errorf("all the vlstorage nodes are unavailable for querying; a sample error: %w", errs[0]) + return false, fmt.Errorf("all the vlstorage nodes are unavailable for querying; a sample error: %w", errs[0]) +} + +// getFirstError returns the first non-nil error in errs. +func getFirstError(errs []error) error { + for _, err := range errs { + if err != nil { + return err + } + } + return nil } func isUnavailableBackendError(err error) bool { diff --git a/docs/victorialogs/CHANGELOG.md b/docs/victorialogs/CHANGELOG.md index b3727cd118..e9c1e943de 100644 --- a/docs/victorialogs/CHANGELOG.md +++ b/docs/victorialogs/CHANGELOG.md @@ -32,6 +32,7 @@ according to the following docs: * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): rename field `Query time` to `Hits query` on the Hits chart panel. The change makes it clear duration of which query is displayed. * FEATURE: [dashboards/kubernetes-explorer](https://github.com/VictoriaMetrics/VictoriaLogs/blob/master/dashboards/victorialogs-kubernetes-explorer.json): add new dashboard for exploring Kubernetes logs via [VictoriaLogs datasource](https://docs.victoriametrics.com/victorialogs/integrations/grafana/) in Grafana. Thanks to @sias32 for [the contribution](https://github.com/VictoriaMetrics/VictoriaLogs/pull/1254)! * FEATURE: [File Collector](https://docs.victoriametrics.com/victorialogs/vlagent/#collect-logs-from-files): enhance glob pattern support with additional syntax: double-star `/**/` for matching nested directories (e.g. `/var/log/**/*.log`), alternatives `{a,b}` for matching multiple specific names (e.g. `{access,error}.log`), character classes `[a-z]` and `?` wildcard for single-character matching. See [#1393](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1393) and [these docs](https://docs.victoriametrics.com/victorialogs/vlagent/#glob-pattern-requirements) for the full pattern syntax reference. +* FEATURE: [querying API](https://docs.victoriametrics.com/victorialogs/querying/): expose the `VL-Partial-Response` HTTP header for all `/select/logsql/*` handlers. The header value is set to `true` if the [response is partial](https://docs.victoriametrics.com/victorialogs/querying/#partial-responses), or `false` if the response is complete. For streaming queries that do not require buffering the final result in memory, this header will contain the value `unknown`. See [#718](https://github.com/VictoriaMetrics/VictoriaLogs/issues/718). * BUGFIX: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): fix [`unroll` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unroll-pipe) and [`json_array_len` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#json_array_len-pipe) ignoring JSON arrays preceded by whitespace. See [#1427](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1427). * BUGFIX: [vlagent](https://docs.victoriametrics.com/victorialogs/vlagent/): hide sensitive values passed via `-remoteWrite.proxyURL` in `/metrics`, `/flags`, and startup logs. Previously these values could be exposed in plain text. See [#1320](https://github.com/VictoriaMetrics/VictoriaLogs/pull/1320). diff --git a/docs/victorialogs/querying/README.md b/docs/victorialogs/querying/README.md index e3ae257e56..395975fe41 100644 --- a/docs/victorialogs/querying/README.md +++ b/docs/victorialogs/querying/README.md @@ -144,6 +144,7 @@ with [`vl_http_requests_total{path="/select/logsql/query"}`](https://docs.victor The `/select/logsql/query` returns the following additional HTTP response headers: - `VL-Request-Duration-Seconds` - the duration of the query until the first response byte. +- `VL-Partial-Response` - indicates whether the response is complete (`false`), [partial](https://docs.victoriametrics.com/victorialogs/querying/#partial-responses) due to unavailable storage nodes (`true`), or cannot be determined, e.g., for unbuffered streaming queries (`unknown`). - `AccountID` and `ProjectID` - the requested [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy). See also: @@ -356,6 +357,7 @@ curl http://localhost:9428/select/logsql/hits -H 'AccountID: 12' -H 'ProjectID: The `/select/logsql/hits` returns the following additional HTTP response headers: - `VL-Request-Duration-Seconds` - the duration of the query until the first response byte. +- `VL-Partial-Response` - indicates whether the response is complete (`false`), [partial](https://docs.victoriametrics.com/victorialogs/querying/#partial-responses) due to unavailable storage nodes (`true`), or cannot be determined, e.g., for unbuffered streaming queries (`unknown`). - `AccountID` and `ProjectID` - the requested [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy). See also: @@ -457,6 +459,7 @@ curl http://localhost:9428/select/logsql/facets -d 'query=_time:1h' -d 'keep_con The `/select/logsql/facets` returns the following additional HTTP response headers: - `VL-Request-Duration-Seconds` - the duration of the query until the first response byte. +- `VL-Partial-Response` - indicates whether the response is complete (`false`), [partial](https://docs.victoriametrics.com/victorialogs/querying/#partial-responses) due to unavailable storage nodes (`true`), or cannot be determined, e.g., for unbuffered streaming queries (`unknown`). - `AccountID` and `ProjectID` - the requested [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy). See also: @@ -539,6 +542,7 @@ It is used by [vmalert](https://docs.victoriametrics.com/victorialogs/vmalert/). The `/select/logsql/stats_query` returns the following additional HTTP response headers: - `VL-Request-Duration-Seconds` - the duration of the query until the first response byte. +- `VL-Partial-Response` - indicates whether the response is complete (`false`), [partial](https://docs.victoriametrics.com/victorialogs/querying/#partial-responses) due to unavailable storage nodes (`true`), or cannot be determined, e.g., for unbuffered streaming queries (`unknown`). - `AccountID` and `ProjectID` - the requested [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy). See also: @@ -656,6 +660,7 @@ It is used by [Grafana plugin for VictoriaLogs](https://docs.victoriametrics.com The `/select/logsql/stats_query_range` returns the following additional HTTP response headers: - `VL-Request-Duration-Seconds` - the duration of the query until the first response byte. +- `VL-Partial-Response` - indicates whether the response is complete (`false`), [partial](https://docs.victoriametrics.com/victorialogs/querying/#partial-responses) due to unavailable storage nodes (`true`), or cannot be determined, e.g., for unbuffered streaming queries (`unknown`). - `AccountID` and `ProjectID` - the requested [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy). See also: @@ -722,6 +727,7 @@ curl http://localhost:9428/select/logsql/stream_ids -H 'AccountID: 12' -H 'Proje The `/select/logsql/stream_ids` returns the following additional HTTP response headers: - `VL-Request-Duration-Seconds` - the duration of the query until the first response byte. +- `VL-Partial-Response` - indicates whether the response is complete (`false`), [partial](https://docs.victoriametrics.com/victorialogs/querying/#partial-responses) due to unavailable storage nodes (`true`), or cannot be determined, e.g., for unbuffered streaming queries (`unknown`). - `AccountID` and `ProjectID` - the requested [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy). See also: @@ -787,6 +793,7 @@ curl http://localhost:9428/select/logsql/streams -H 'AccountID: 12' -H 'ProjectI The `/select/logsql/streams` returns the following additional HTTP response headers: - `VL-Request-Duration-Seconds` - the duration of the query until the first response byte. +- `VL-Partial-Response` - indicates whether the response is complete (`false`), [partial](https://docs.victoriametrics.com/victorialogs/querying/#partial-responses) due to unavailable storage nodes (`true`), or cannot be determined, e.g., for unbuffered streaming queries (`unknown`). - `AccountID` and `ProjectID` - the requested [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy). See also: @@ -851,6 +858,7 @@ curl http://localhost:9428/select/logsql/stream_field_names -H 'AccountID: 12' - The `/select/logsql/stream_field_names` returns the following additional HTTP response headers: - `VL-Request-Duration-Seconds` - the duration of the query until the first response byte. +- `VL-Partial-Response` - indicates whether the response is complete (`false`), [partial](https://docs.victoriametrics.com/victorialogs/querying/#partial-responses) due to unavailable storage nodes (`true`), or cannot be determined, e.g., for unbuffered streaming queries (`unknown`). - `AccountID` and `ProjectID` - the requested [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy). See also: @@ -914,6 +922,7 @@ curl http://localhost:9428/select/logsql/stream_field_values -H 'AccountID: 12' The `/select/logsql/stream_field_values` returns the following additional HTTP response headers: - `VL-Request-Duration-Seconds` - the duration of the query until the first response byte. +- `VL-Partial-Response` - indicates whether the response is complete (`false`), [partial](https://docs.victoriametrics.com/victorialogs/querying/#partial-responses) due to unavailable storage nodes (`true`), or cannot be determined, e.g., for unbuffered streaming queries (`unknown`). - `AccountID` and `ProjectID` - the requested [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy). See also: @@ -997,6 +1006,7 @@ curl http://localhost:9428/select/logsql/field_names -H 'AccountID: 12' -H 'Proj The `/select/logsql/field_names` returns the following additional HTTP response headers: - `VL-Request-Duration-Seconds` - the duration of the query until the first response byte. +- `VL-Partial-Response` - indicates whether the response is complete (`false`), [partial](https://docs.victoriametrics.com/victorialogs/querying/#partial-responses) due to unavailable storage nodes (`true`), or cannot be determined, e.g., for unbuffered streaming queries (`unknown`). - `AccountID` and `ProjectID` - the requested [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy). See also: @@ -1065,6 +1075,7 @@ curl http://localhost:9428/select/logsql/field_values -H 'AccountID: 12' -H 'Pro The `/select/logsql/field_values` returns the following additional HTTP response headers: - `VL-Request-Duration-Seconds` - the duration of the query until the first response byte. +- `VL-Partial-Response` - indicates whether the response is complete (`false`), [partial](https://docs.victoriametrics.com/victorialogs/querying/#partial-responses) due to unavailable storage nodes (`true`), or cannot be determined, e.g., for unbuffered streaming queries (`unknown`). - `AccountID` and `ProjectID` - the requested [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy). See also: diff --git a/lib/logstorage/query_stats.go b/lib/logstorage/query_stats.go index 89f58f08a2..2133b2471e 100644 --- a/lib/logstorage/query_stats.go +++ b/lib/logstorage/query_stats.go @@ -44,6 +44,8 @@ type QueryStats struct { // BytesProcessedUncompressedValues is the total number of uncompressed values bytes processed during the search. BytesProcessedUncompressedValues uint64 + + QueryCompleteness QueryCompleteness } // GetBytesReadTotal returns the total number of bytes read, which is tracked by qs. @@ -66,6 +68,71 @@ func (qs *QueryStats) UpdateAtomic(src *QueryStats) { atomic.AddUint64(&qs.ValuesRead, src.ValuesRead) atomic.AddUint64(&qs.TimestampsRead, src.TimestampsRead) atomic.AddUint64(&qs.BytesProcessedUncompressedValues, src.BytesProcessedUncompressedValues) + + mergeQueryCompleteness(&qs.QueryCompleteness, &src.QueryCompleteness) +} + +// QueryCompleteness indicates whether a query returned a full or partial result set. +// Partial result only happen if the `-search.allowPartialResponse` command-line flag or the `allow_partial_response` LogsQL option is set. +// See https://docs.victoriametrics.com/victorialogs/querying/#partial-responses +type QueryCompleteness uint32 + +const ( + // QueryCompletenessNotSet indicates that the value was not set. + // This typically occurs in streaming queries where data is not buffered but flushed periodically without merging a final completeness status. + // In this case, it behaves like QueryCompletenessUnknown, except that QueryCompletenessNotSet has a lower priority during a merge than other statuses. + QueryCompletenessNotSet QueryCompleteness = 0 + + // QueryCompletenessPartial means that at least one storage failed to process the query, and the response contains partially calculated data. + QueryCompletenessPartial QueryCompleteness = 1 + + // QueryCompletenessUnknown indicates a case where it is impossible to determine whether a query is complete or partial. + // This occurs with streaming queries that are not buffered in memory. + QueryCompletenessUnknown QueryCompleteness = 2 + + // QueryCompletenessComplete means that the query was processed by every storage without errors, and the response is complete. + QueryCompletenessComplete QueryCompleteness = 3 +) + +func (qs *QueryStats) MergeQueryCompleteness(complete bool) { + v := QueryCompletenessComplete + if !complete { + v = QueryCompletenessPartial + } + mergeQueryCompleteness(&qs.QueryCompleteness, &v) +} + +func mergeQueryCompleteness(dstPtr, srcPtr *QueryCompleteness) { + src := QueryCompleteness(atomic.LoadUint32((*uint32)(srcPtr))) + if src == QueryCompletenessNotSet { + return + } + srcPriority := getQueryCompletenessPriority(src) + + for { + dst := QueryCompleteness(atomic.LoadUint32((*uint32)(dstPtr))) + dstPriority := getQueryCompletenessPriority(dst) + if srcPriority <= dstPriority { + // src priority is lower, e.g. Partial should not be overridden with Unknown. + return + } + if atomic.CompareAndSwapUint32((*uint32)(dstPtr), uint32(dst), uint32(src)) { + return + } + } +} + +func getQueryCompletenessPriority(qc QueryCompleteness) uint8 { + switch qc { + case QueryCompletenessPartial: + return 3 + case QueryCompletenessUnknown: + return 2 + case QueryCompletenessComplete: + return 1 + default: + return 0 + } } // UpdateAtomicFromDataBlock adds query stats from db to qs. @@ -76,9 +143,12 @@ func (qs *QueryStats) UpdateFromDataBlock(db *DataBlock) error { } var errGlobal error - getUint64Entry := func(name string) uint64 { + getUint64Entry := func(name string, required bool) uint64 { c := db.GetColumnByName(name) if c == nil { + if !required { + return 0 + } if errGlobal == nil { errGlobal = fmt.Errorf("cannot find field %q in query stats received from the remote storage", name) } @@ -89,19 +159,27 @@ func (qs *QueryStats) UpdateFromDataBlock(db *DataBlock) error { return n } - qs.BytesReadColumnsHeaders += getUint64Entry("BytesReadColumnsHeaders") - qs.BytesReadColumnsHeaderIndexes += getUint64Entry("BytesReadColumnsHeaderIndexes") - qs.BytesReadBloomFilters += getUint64Entry("BytesReadBloomFilters") - qs.BytesReadValues += getUint64Entry("BytesReadValues") - qs.BytesReadTimestamps += getUint64Entry("BytesReadTimestamps") - qs.BytesReadBlockHeaders += getUint64Entry("BytesReadBlockHeaders") - - qs.BlocksProcessed += getUint64Entry("BlocksProcessed") - qs.RowsProcessed += getUint64Entry("RowsProcessed") - qs.RowsFound += getUint64Entry("RowsFound") - qs.ValuesRead += getUint64Entry("ValuesRead") - qs.TimestampsRead += getUint64Entry("TimestampsRead") - qs.BytesProcessedUncompressedValues += getUint64Entry("BytesProcessedUncompressedValues") + qs.BytesReadColumnsHeaders += getUint64Entry("BytesReadColumnsHeaders", true) + qs.BytesReadColumnsHeaderIndexes += getUint64Entry("BytesReadColumnsHeaderIndexes", true) + qs.BytesReadBloomFilters += getUint64Entry("BytesReadBloomFilters", true) + qs.BytesReadValues += getUint64Entry("BytesReadValues", true) + qs.BytesReadTimestamps += getUint64Entry("BytesReadTimestamps", true) + qs.BytesReadBlockHeaders += getUint64Entry("BytesReadBlockHeaders", true) + + qs.BlocksProcessed += getUint64Entry("BlocksProcessed", true) + qs.RowsProcessed += getUint64Entry("RowsProcessed", true) + qs.RowsFound += getUint64Entry("RowsFound", true) + qs.ValuesRead += getUint64Entry("ValuesRead", true) + qs.TimestampsRead += getUint64Entry("TimestampsRead", true) + qs.BytesProcessedUncompressedValues += getUint64Entry("BytesProcessedUncompressedValues", true) + + v := QueryCompleteness(getUint64Entry("QueryCompleteness", false)) + if v == QueryCompletenessNotSet { + // We received NotSet, which means a backend flushed the data without waiting for the final status. + // Override the value to Unknown to distinguish it from the default value (NotSet). + v = QueryCompletenessUnknown + } + mergeQueryCompleteness(&qs.QueryCompleteness, &v) return errGlobal } @@ -121,6 +199,9 @@ func (qs *QueryStats) CreateDataBlock(queryDurationNsecs int64) *DataBlock { qs.addEntries(addUint64Entry, queryDurationNsecs) + // An internal field that should not be shown to the user. + addUint64Entry("QueryCompleteness", uint64(qs.QueryCompleteness)) + return &DataBlock{ columns: cs, } diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 2b1f6a4497..0dd72a2da2 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -217,6 +217,9 @@ func (s *Storage) RunQuery(qctx *QueryContext, writeBlock WriteDataBlockFunc) er type runQueryFunc func(qctx *QueryContext, writeBlock writeBlockResultFunc) error func (s *Storage) runQuery(qctx *QueryContext, writeBlock writeBlockResultFunc) error { + // Mark the response as complete. + qctx.QueryStats.MergeQueryCompleteness(true) + qNew, err := initSubqueries(qctx, s.runQuery, false) if err != nil { return err