Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 36 additions & 16 deletions app/vlselect/logsql/logsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func ProcessFacetsRequest(ctx context.Context, w http.ResponseWriter, r *http.Re
h := w.Header()

h.Set("Content-Type", "application/json")
ca.writeResponseHeaders(h, startTime)
ca.writeResponseHeaders(h, qctx.QueryStats, startTime)

// Write response
WriteFacetsResponse(w, m)
Expand Down Expand Up @@ -310,7 +310,7 @@ func ProcessHitsRequest(ctx context.Context, w http.ResponseWriter, r *http.Requ
h := w.Header()

h.Set("Content-Type", "application/json")
ca.writeResponseHeaders(h, startTime)
ca.writeResponseHeaders(h, qctx.QueryStats, startTime)

// Write response
WriteHitsSeries(w, m)
Expand Down Expand Up @@ -450,7 +450,7 @@ func ProcessFieldNamesRequest(ctx context.Context, w http.ResponseWriter, r *htt
h := w.Header()

h.Set("Content-Type", "application/json")
ca.writeResponseHeaders(h, startTime)
ca.writeResponseHeaders(h, qctx.QueryStats, startTime)

// Write results
WriteValuesWithHitsJSON(w, fieldNames)
Expand Down Expand Up @@ -498,7 +498,7 @@ func ProcessFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *ht
h := w.Header()

h.Set("Content-Type", "application/json")
ca.writeResponseHeaders(h, startTime)
ca.writeResponseHeaders(h, qctx.QueryStats, startTime)

// Write results
WriteValuesWithHitsJSON(w, values)
Expand Down Expand Up @@ -532,7 +532,7 @@ func ProcessStreamFieldNamesRequest(ctx context.Context, w http.ResponseWriter,
h := w.Header()

h.Set("Content-Type", "application/json")
ca.writeResponseHeaders(h, startTime)
ca.writeResponseHeaders(h, qctx.QueryStats, startTime)

// Write results
WriteValuesWithHitsJSON(w, names)
Expand Down Expand Up @@ -580,7 +580,7 @@ func ProcessStreamFieldValuesRequest(ctx context.Context, w http.ResponseWriter,
h := w.Header()

h.Set("Content-Type", "application/json")
ca.writeResponseHeaders(h, startTime)
ca.writeResponseHeaders(h, qctx.QueryStats, startTime)

// Write results
WriteValuesWithHitsJSON(w, values)
Expand Down Expand Up @@ -618,7 +618,7 @@ func ProcessStreamIDsRequest(ctx context.Context, w http.ResponseWriter, r *http
h := w.Header()

h.Set("Content-Type", "application/json")
ca.writeResponseHeaders(h, startTime)
ca.writeResponseHeaders(h, qctx.QueryStats, startTime)

// Write results
WriteValuesWithHitsJSON(w, streamIDs)
Expand Down Expand Up @@ -656,7 +656,7 @@ func ProcessStreamsRequest(ctx context.Context, w http.ResponseWriter, r *http.R
h := w.Header()

h.Set("Content-Type", "application/json")
ca.writeResponseHeaders(h, startTime)
ca.writeResponseHeaders(h, qctx.QueryStats, startTime)

// Write results
WriteValuesWithHitsJSON(w, streams)
Expand Down Expand Up @@ -1030,7 +1030,7 @@ func ProcessStatsQueryRangeRequest(ctx context.Context, w http.ResponseWriter, r
h := w.Header()

h.Set("Content-Type", "application/json")
ca.writeResponseHeaders(h, startTime)
ca.writeResponseHeaders(h, qctx.QueryStats, startTime)

// Write response
WriteStatsQueryRangeResponse(w, rows)
Expand Down Expand Up @@ -1153,7 +1153,7 @@ func ProcessStatsQueryRequest(ctx context.Context, w http.ResponseWriter, r *htt
h := w.Header()

h.Set("Content-Type", "application/json")
ca.writeResponseHeaders(h, startTime)
ca.writeResponseHeaders(h, qctx.QueryStats, startTime)

// Write response
WriteStatsQueryResponse(w, rows)
Expand Down Expand Up @@ -1245,6 +1245,9 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
}
}()

qctx := ca.newQueryContext(ctx)
defer ca.updatePerQueryStatsMetrics()

startTime := time.Now()
writeResponseHeadersOnce := sync.OnceFunc(func() {
// Write response headers
Expand All @@ -1255,7 +1258,7 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
} else {
h.Set("Content-Type", "application/stream+json")
}
ca.writeResponseHeaders(h, startTime)
ca.writeResponseHeaders(h, qctx.QueryStats, startTime)

if format == "csv" {
_, _ = sw.Write(csvHeader)
Expand Down Expand Up @@ -1293,9 +1296,6 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
}
}

qctx := ca.newQueryContext(ctx)
defer ca.updatePerQueryStatsMetrics()

// Execute the query
if err := vlstorage.RunQuery(qctx, writeBlock); err != nil {
httpserver.Errorf(w, r, "cannot execute query [%s]: %s", ca.q, err)
Expand Down Expand Up @@ -1808,9 +1808,9 @@ func getStringSliceFromRequest(r *http.Request, argName string) ([]string, error
return a, nil
}

func (ca *commonArgs) writeResponseHeaders(h http.Header, startTime time.Time) {
func (ca *commonArgs) writeResponseHeaders(h http.Header, qs *logstorage.QueryStats, startTime time.Time) {
// Write request duration
accessControlExposeHeaders := []string{"VL-Request-Duration-Seconds"}
accessControlExposeHeaders := []string{"VL-Request-Duration-Seconds", "VL-Partial-Response"}
h.Set("VL-Request-Duration-Seconds", fmt.Sprintf("%.3f", time.Since(startTime).Seconds()))

if len(ca.tenantIDs) == 1 {
Expand All @@ -1821,12 +1821,32 @@ func (ca *commonArgs) writeResponseHeaders(h http.Header, startTime time.Time) {
h.Set("ProjectID", fmt.Sprintf("%d", tenantID.ProjectID))
}

qc := qs.QueryCompleteness
h.Set("VL-Partial-Response", getPartialResponseHeaderValue(qc))
Comment on lines +1824 to +1825
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This read may race with atomic writes to qs.QueryCompleteness. Adding a method such as QueryStats.GetQueryCompleteness(), which uses atomic.LoadUint32(), and use it here?

Copy link
Copy Markdown
Member Author

@vadimalekseev vadimalekseev May 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, there was such a method, but I removed it to simplify the code, because:

  1. Only one goroutine can call writeResponseHeaders.
  2. The other goroutines wait for the first one to finish calling writeResponseHeaders here:
    before updating the QueryCompleteness value here:
    defer qctx.QueryStats.UpdateAtomic(qsLocal)

Should we explicitly sync this, do you think?

Copy link
Copy Markdown
Member Author

@vadimalekseev vadimalekseev May 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I double-checked the code, and the fact that it doesn't need synchronization is really counter-intuitive. I will add explicit synchronization

See the comment bellow.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though this counter-intuitive behavior is basically the core of the whole PR: we cant let another goroutine write QueryCompletenessComplete, otherwise we'll set the partial response to 'false' for a streaming request.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More explicit (but complex) implementation described here: #718 (comment)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I don't understand, I don't mean waiting until all goroutines finish before writing the header. I mean using atomic load for reading QueryCompleteness, since the field is updated via atomic/CAS elsewhere. This doesn't change the timing. Could you explain more?

Copy link
Copy Markdown
Member Author

@vadimalekseev vadimalekseev May 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @func25, I've updated the runQuery function, so QueryStats updates only after the wg.Done(). Could you please check again?

Previously, there actually could have been a race condition if an empty response was received from vlstorage. In that case, we would never have called writeBlock, which used to act as a mutex due to writeResponseHeadersOnce.

This could lead to a situation where one vlstorage returned an empty response, while the second one was unavailable. In this case, we could potentially set QueryCompletenessComplete instead of Partial. The current code should avoid this situation.


for i, v := range accessControlExposeHeaders {
accessControlExposeHeaders[i] = http.CanonicalHeaderKey(v)
}
h.Set("Access-Control-Expose-Headers", strings.Join(accessControlExposeHeaders, ", "))
}

func getPartialResponseHeaderValue(qc logstorage.QueryCompleteness) string {
switch qc {
case logstorage.QueryCompletenessNotSet:
// qs is not set, means the response is flushed without waiting full response.
// Treat this as "unknown".
return "unknown"
case logstorage.QueryCompletenessPartial:
return "true"
case logstorage.QueryCompletenessUnknown:
return "unknown"
case logstorage.QueryCompletenessComplete:
return "false"
default:
panic(fmt.Errorf("BUG: unexpected QueryCompleteness value %d", qc))
}
}

func parseDuration(r *http.Request, argName, defaultValue string) (int64, error) {
s := r.FormValue(argName)
if s == "" {
Expand Down
Loading