diff --git a/docs/victorialogs/CHANGELOG.md b/docs/victorialogs/CHANGELOG.md index 7bd39b6cea..2639c1ad52 100644 --- a/docs/victorialogs/CHANGELOG.md +++ b/docs/victorialogs/CHANGELOG.md @@ -24,6 +24,7 @@ according to the following docs: * SECURITY: upgrade Go builder from Go1.26.2 to Go1.26.3. See [the list of issues addressed in Go1.26.3](https://github.com/golang/go/issues?q=milestone%3AGo1.26.3%20label%3ACherryPickApproved). +* FEATURE: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): add [`json_array_concat` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#json_array_concat-pipe) for joining JSON array items stored in the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into a string with the given delimiter. See [#712](https://github.com/VictoriaMetrics/VictoriaLogs/issues/712). * FEATURE: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): add [`coalesce` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#coalesce-pipe), which returns the first non-empty value from the given list of fields. This is useful when the same value may be stored under different field names across logs, such as `user_id`, `username` or `email`. The pipe can also return a default value when all the fields are empty. See [#690](https://github.com/VictoriaMetrics/VictoriaLogs/issues/690). Thanks to @warkadiusz for [the pull request #904](https://github.com/VictoriaMetrics/VictoriaLogs/pull/904). * FEATURE: [querying API](https://docs.victoriametrics.com/victorialogs/querying/): allow using [`limit`](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) and [`offset`](https://docs.victoriametrics.com/victorialogs/logsql/#offset-pipe) pipes after the [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) in queries to [`/select/logsql/stats_query`](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-stats). This enables the usage for these pipes in [alerting and recording rules for VictoriaLogs](https://docs.victoriametrics.com/victorialogs/vmalert/). See [#1296](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1296). * FEATURE: [alerts](https://github.com/VictoriaMetrics/VictoriaLogs/blob/master/deployment/docker/rules): add new alerting rules `PersistentQueueRunsOutOfSpaceIn12Hours` and `PersistentQueueRunsOutOfSpaceIn4Hours` for `vlagent` persistent queue capacity. These alerts help users to take proactive actions before `vlagent` starts dropping logs due to insufficient persistent queue space. See [#10193](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10193) diff --git a/docs/victorialogs/logsql.md b/docs/victorialogs/logsql.md index 9504e3badb..5959012772 100644 --- a/docs/victorialogs/logsql.md +++ b/docs/victorialogs/logsql.md @@ -1851,6 +1851,7 @@ LogsQL supports the following pipes: - [`format`](https://docs.victoriametrics.com/victorialogs/logsql/#format-pipe) formats output field from input [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`generate_sequence`](https://docs.victoriametrics.com/victorialogs/logsql/#generate_sequence-pipe) generates output logs with messages containing integer sequence. - [`join`](https://docs.victoriametrics.com/victorialogs/logsql/#join-pipe) joins query results by the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`json_array_concat`](https://docs.victoriametrics.com/victorialogs/logsql/#json_array_concat-pipe) joins JSON array items stored at the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into a string using the given delimiter. - [`json_array_len`](https://docs.victoriametrics.com/victorialogs/logsql/#json_array_len-pipe) returns the length of JSON array stored at the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`hash`](https://docs.victoriametrics.com/victorialogs/logsql/#hash-pipe) returns the hash over the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) value. @@ -2727,6 +2728,35 @@ See also: - [adding static logs](https://docs.victoriametrics.com/victorialogs/logsql/#adding-static-logs) +### json_array_concat pipe + +The ` | json_array_concat [delimiter] [from ] [as ]` [pipe](https://docs.victoriametrics.com/victorialogs/logsql/#pipes) joins items of the JSON array stored in `` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) obtained from `` [query](https://docs.victoriametrics.com/victorialogs/logsql/#query-syntax) results into a string using the given `delimiter`, and stores the result into ``. + +For example, the following query joins items of the `tags` JSON array using `,` as delimiter and stores the result back into the `tags` field: + +```logsql +_time:5m | json_array_concat "," from tags +``` + +The `as ` part is optional. If it is missing, then the result is stored in the `` specified in `from `. +For example, the following query stores the result into a separate `tags_str` field: + +```logsql +_time:5m | json_array_concat "," from tags as tags_str +``` + +The `from ` part is optional. If it is missing, then the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) is used as the source field. + +The `delimiter` is optional. If it is missing, then items are concatenated without any separator. + +If `` does not contain a valid JSON array, the result is an empty string. + +See also: + +- [`split` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#split-pipe) +- [`json_array_len` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#json_array_len-pipe) +- [`unroll` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unroll-pipe) + ### json_array_len pipe ` | json_array_len(field) as result_field` calculates the length of JSON array at the given [`field`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) @@ -3467,6 +3497,7 @@ _time:5m | split "," as items | unroll items | top 5 (items) See also: +- [`json_array_concat` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#json_array_concat-pipe) - [`unroll` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unroll-pipe) - [`unpack_words` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_words-pipe) diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 6490661e69..142098bcce 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -200,6 +200,7 @@ func initPipeParsers() { "generate_sequence": parsePipeGenerateSequence, "hash": parsePipeHash, "join": parsePipeJoin, + "json_array_concat": parsePipeJSONArrayConcat, "json_array_len": parsePipeJSONArrayLen, "head": parsePipeLimit, "keep": parsePipeFields, diff --git a/lib/logstorage/pipe_json_array_concat.go b/lib/logstorage/pipe_json_array_concat.go new file mode 100644 index 0000000000..84b38af2d2 --- /dev/null +++ b/lib/logstorage/pipe_json_array_concat.go @@ -0,0 +1,213 @@ +package logstorage + +import ( + "fmt" + + "github.com/valyala/fastjson" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + + "github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter" +) + +// pipeJSONArrayConcat processes '| json_array_concat ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#json_array_concat-pipe +type pipeJSONArrayConcat struct { + delimiter string + fromField string + resultField string +} + +func (pc *pipeJSONArrayConcat) String() string { + s := "json_array_concat" + if pc.delimiter != "" { + s += " " + quoteTokenIfNeeded(pc.delimiter) + } + if !isMsgFieldName(pc.fromField) { + s += " from " + quoteTokenIfNeeded(pc.fromField) + } + if pc.resultField != pc.fromField { + s += " as " + quoteTokenIfNeeded(pc.resultField) + } + return s +} + +func (pc *pipeJSONArrayConcat) splitToRemoteAndLocal(_ int64) (pipe, []pipe) { + return pc, nil +} + +func (pc *pipeJSONArrayConcat) canLiveTail() bool { + return true +} + +func (pc *pipeJSONArrayConcat) canReturnLastNResults() bool { + return pc.resultField != "_time" +} + +func (pc *pipeJSONArrayConcat) isFixedOutputFieldsOrder() bool { + return false +} + +func (pc *pipeJSONArrayConcat) updateNeededFields(pf *prefixfilter.Filter) { + if pf.MatchString(pc.resultField) { + pf.AddDenyFilter(pc.resultField) + pf.AddAllowFilter(pc.fromField) + } +} + +func (pc *pipeJSONArrayConcat) hasFilterInWithQuery() bool { + return false +} + +func (pc *pipeJSONArrayConcat) initFilterInValues(_ *inValuesCache, _ getFieldValuesFunc) (pipe, error) { + return pc, nil +} + +func (pc *pipeJSONArrayConcat) visitSubqueries(_ func(q *Query)) { + // nothing to do +} + +func (pc *pipeJSONArrayConcat) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { + pcp := &pipeJSONArrayConcatProcessor{ + pc: pc, + ppNext: ppNext, + } + pcp.shards.Init = func(shard *pipeJSONArrayConcatProcessorShard) { + shard.reset() + } + return pcp +} + +type pipeJSONArrayConcatProcessor struct { + pc *pipeJSONArrayConcat + ppNext pipeProcessor + + shards atomicutil.Slice[pipeJSONArrayConcatProcessorShard] +} + +func (plp *pipeJSONArrayConcatProcessor) writeBlock(workerID uint, br *blockResult) { + if br.rowsLen == 0 { + return + } + + shard := plp.shards.Get(workerID) + shard.rc.name = plp.pc.resultField + + c := br.getColumnByName(plp.pc.fromField) + delimiter := plp.pc.delimiter + if c.isConst { + // Fast path for const column + v := c.valuesEncoded[0] + out := shard.concat(v, delimiter) + shard.rc.addValue(out) + br.addResultColumnConst(shard.rc) + } else { + // Slow path for other columns + values := c.getValues(br) + prevOut := "" + for rowIdx := range values { + if rowIdx == 0 || values[rowIdx] != values[rowIdx-1] { + prevOut = shard.concat(values[rowIdx], delimiter) + } + shard.rc.addValue(prevOut) + } + br.addResultColumn(shard.rc) + } + + plp.ppNext.writeBlock(workerID, br) + + shard.reset() +} + +type pipeJSONArrayConcatProcessorShard struct { + a arena + rc resultColumn +} + +func (shard *pipeJSONArrayConcatProcessorShard) reset() { + shard.a.reset() + shard.rc.reset() +} + +func (shard *pipeJSONArrayConcatProcessorShard) concat(arrayStr, delimiter string) string { + arrayStr = trimJSONWhitespace(arrayStr) + if len(arrayStr) == 0 || arrayStr[0] != '[' { + return "" + } + + p := jspp.Get() + defer jspp.Put(p) + + jsv, err := p.Parse(arrayStr) + if err != nil { + return "" + } + jsa, err := jsv.Array() + if err != nil || len(jsa) == 0 { + return "" + } + + bLen := len(shard.a.b) + for i, item := range jsa { + if i > 0 { + shard.a.b = append(shard.a.b, delimiter...) + } + if item.Type() == fastjson.TypeString { + sb, _ := item.StringBytes() + shard.a.b = append(shard.a.b, sb...) + } else { + shard.a.b = item.MarshalTo(shard.a.b) + } + } + return bytesutil.ToUnsafeString(shard.a.b[bLen:]) +} + +func (plp *pipeJSONArrayConcatProcessor) flush() error { + return nil +} + +func parsePipeJSONArrayConcat(lex *lexer) (pipe, error) { + if !lex.isKeyword("json_array_concat") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "json_array_concat") + } + lex.nextToken() + + delimiter := "" + if !lex.isKeyword("from", "as", "|", ")", "") { + s, err := lex.nextCompoundToken() + if err != nil { + return nil, fmt.Errorf("cannot parse delimiter for 'json_array_concat': %w", err) + } + delimiter = s + } + + fromField := "_msg" + if lex.isKeyword("from") { + lex.nextToken() + f, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'from' field for 'json_array_concat': %w", err) + } + fromField = f + } + + resultField := fromField + if lex.isKeyword("as") { + lex.nextToken() + } + if !lex.isKeyword("|", ")", "") { + f, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse result field for 'json_array_concat': %w", err) + } + resultField = f + } + + return &pipeJSONArrayConcat{ + delimiter: delimiter, + fromField: fromField, + resultField: resultField, + }, nil +} diff --git a/lib/logstorage/pipe_json_array_concat_test.go b/lib/logstorage/pipe_json_array_concat_test.go new file mode 100644 index 0000000000..ec7f2e7d58 --- /dev/null +++ b/lib/logstorage/pipe_json_array_concat_test.go @@ -0,0 +1,170 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipeJSONArrayConcatSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`json_array_concat`) + f(`json_array_concat ","`) + f(`json_array_concat ", "`) + f(`json_array_concat from foo`) + f(`json_array_concat as bar`) + f(`json_array_concat "," from foo`) + f(`json_array_concat "," as bar`) + f(`json_array_concat from foo as bar`) + f(`json_array_concat "," from foo as bar`) +} + +func TestParsePipeJSONArrayConcatFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`json_array_concat from`) + f(`json_array_concat "," from *`) + f(`json_array_concat "," from foo*`) + f(`json_array_concat "," from foo as *`) + f(`json_array_concat "," from foo as bar*`) + f(`json_array_concat "," from foo as bar baz`) +} + +func TestPipeJSONArrayConcat(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + // basic + f(`json_array_concat "," from foo`, [][]Field{ + {{"foo", `["a","b","c"]`}}, + }, [][]Field{ + {{"foo", "a,b,c"}}, + }) + + // no delimiter + f(`json_array_concat from foo`, [][]Field{ + {{"foo", `["a","b","c"]`}}, + }, [][]Field{ + {{"foo", "abc"}}, + }) + + // single element + f(`json_array_concat "," from foo`, [][]Field{ + {{"foo", `["only"]`}}, + }, [][]Field{ + {{"foo", "only"}}, + }) + + // non-string item types + f(`json_array_concat "," from foo`, [][]Field{ + {{"foo", `["hello",42,true,{"a":1},null]`}}, + }, [][]Field{ + {{"foo", `hello,42,true,{"a":1},null`}}, + }) + + // empty array + f(`json_array_concat "," from foo`, [][]Field{ + {{"foo", `[]`}}, + }, [][]Field{ + {{"foo", ""}}, + }) + + // non-array input + f(`json_array_concat "," from foo`, [][]Field{ + {{"foo", `not-an-array`}}, + }, [][]Field{ + {{"foo", ""}}, + }) + + // missing source field + f(`json_array_concat "," from foo`, [][]Field{ + {{"bar", "baz"}}, + }, [][]Field{ + {{"bar", "baz"}, {"foo", ""}}, + }) + + // write result to a different field, source field unchanged + f(`json_array_concat "," from foo as result`, [][]Field{ + {{"foo", `["a","b","c"]`}}, + }, [][]Field{ + {{"foo", `["a","b","c"]`}, {"result", "a,b,c"}}, + }) + + // default source and result field (_msg) + f(`json_array_concat ","`, [][]Field{ + {{"_msg", `["x","y","z"]`}}, + }, [][]Field{ + {{"_msg", "x,y,z"}}, + }) + + // JSON whitespace around JSON array + f(`json_array_concat "," from foo`, [][]Field{ + {{"foo", ` ["a","b","c"]`}}, + {{"foo", "\t[\"d\",\"e\",\"f\"]"}}, + {{"foo", "\n[\"g\",\"h\",\"i\"]"}}, + {{"foo", "\r[\"j\",\"k\",\"l\"]"}}, + {{"foo", " \t\n\r[\"m\",\"n\",\"o\"] \r\n\t"}}, + }, [][]Field{ + {{"foo", "a,b,c"}}, + {{"foo", "d,e,f"}}, + {{"foo", "g,h,i"}}, + {{"foo", "j,k,l"}}, + {{"foo", "m,n,o"}}, + }) + + // malformed JSON array starting with [ + f(`json_array_concat "," from foo`, [][]Field{ + {{"foo", `["a"`}}, + {{"foo", `[1,`}}, + }, [][]Field{ + {{"foo", ""}}, + {{"foo", ""}}, + }) + + // slow path: multiple rows with different and repeated values + f(`json_array_concat "," from foo`, [][]Field{ + {{"foo", `["a","b"]`}}, + {{"foo", `["x","y","z"]`}}, + {{"foo", `["a","b"]`}}, + }, [][]Field{ + {{"foo", "a,b"}}, + {{"foo", "x,y,z"}}, + {{"foo", "a,b"}}, + }) +} + +func TestPipeJSONArrayConcatUpdateNeededFields(t *testing.T) { + f := func(s string, allowFilters, denyFilters, allowFiltersExpected, denyFiltersExpected string) { + t.Helper() + expectPipeNeededFields(t, s, allowFilters, denyFilters, allowFiltersExpected, denyFiltersExpected) + } + + // all the needed fields + f(`json_array_concat "," from y as x`, "*", "", "*", "x") + f(`json_array_concat "," from x as x`, "*", "", "*", "") + + // unneeded fields do not intersect with output field + f(`json_array_concat "," from y as x`, "*", "f1,f2", "*", "f1,f2,x") + f(`json_array_concat "," from x as x`, "*", "f1,f2", "*", "f1,f2") + + // unneeded fields intersect with output field + f(`json_array_concat "," from z as x`, "*", "x,y", "*", "x,y") + f(`json_array_concat "," from y as x`, "*", "x,y", "*", "x,y") + f(`json_array_concat "," from x as x`, "*", "x,y", "*", "x,y") + + // needed fields do not intersect with output field + f(`json_array_concat "," from y as z`, "x,y", "", "x,y", "") + f(`json_array_concat "," from z as z`, "x,y", "", "x,y", "") + + // needed fields intersect with output field + f(`json_array_concat "," from z as f2`, "f2,y", "", "y,z", "") + f(`json_array_concat "," from y as f2`, "f2,y", "", "y", "") + f(`json_array_concat "," from y as y`, "f2,y", "", "f2,y", "") +}