Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/victorialogs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ according to the following docs:

* SECURITY: upgrade Go builder from Go1.26.2 to Go1.26.3. See [the list of issues addressed in Go1.26.3](https://github.com/golang/go/issues?q=milestone%3AGo1.26.3%20label%3ACherryPickApproved).

* FEATURE: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): add [`json_array_concat` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#json_array_concat-pipe) for joining JSON array items stored in the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into a string with the given delimiter. See [#712](https://github.com/VictoriaMetrics/VictoriaLogs/issues/712).
* FEATURE: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): add [`coalesce` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#coalesce-pipe), which returns the first non-empty value from the given list of fields. This is useful when the same value may be stored under different field names across logs, such as `user_id`, `username` or `email`. The pipe can also return a default value when all the fields are empty. See [#690](https://github.com/VictoriaMetrics/VictoriaLogs/issues/690). Thanks to @warkadiusz for [the pull request #904](https://github.com/VictoriaMetrics/VictoriaLogs/pull/904).
* FEATURE: [querying API](https://docs.victoriametrics.com/victorialogs/querying/): allow using [`limit`](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) and [`offset`](https://docs.victoriametrics.com/victorialogs/logsql/#offset-pipe) pipes after the [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) in queries to [`/select/logsql/stats_query`](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-stats). This enables the usage for these pipes in [alerting and recording rules for VictoriaLogs](https://docs.victoriametrics.com/victorialogs/vmalert/). See [#1296](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1296).
* FEATURE: [alerts](https://github.com/VictoriaMetrics/VictoriaLogs/blob/master/deployment/docker/rules): add new alerting rules `PersistentQueueRunsOutOfSpaceIn12Hours` and `PersistentQueueRunsOutOfSpaceIn4Hours` for `vlagent` persistent queue capacity. These alerts help users to take proactive actions before `vlagent` starts dropping logs due to insufficient persistent queue space. See [#10193](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10193)
Expand Down
31 changes: 31 additions & 0 deletions docs/victorialogs/logsql.md
Original file line number Diff line number Diff line change
Expand Up @@ -1851,6 +1851,7 @@ LogsQL supports the following pipes:
- [`format`](https://docs.victoriametrics.com/victorialogs/logsql/#format-pipe) formats output field from input [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`generate_sequence`](https://docs.victoriametrics.com/victorialogs/logsql/#generate_sequence-pipe) generates output logs with messages containing integer sequence.
- [`join`](https://docs.victoriametrics.com/victorialogs/logsql/#join-pipe) joins query results by the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`json_array_concat`](https://docs.victoriametrics.com/victorialogs/logsql/#json_array_concat-pipe) joins JSON array items stored at the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into a string using the given delimiter.
- [`json_array_len`](https://docs.victoriametrics.com/victorialogs/logsql/#json_array_len-pipe) returns the length of JSON array stored
at the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`hash`](https://docs.victoriametrics.com/victorialogs/logsql/#hash-pipe) returns the hash over the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) value.
Expand Down Expand Up @@ -2727,6 +2728,35 @@ See also:
- [adding static logs](https://docs.victoriametrics.com/victorialogs/logsql/#adding-static-logs)


### json_array_concat pipe

The `<q> | json_array_concat [delimiter] [from <src_field>] [as <result_field>]` [pipe](https://docs.victoriametrics.com/victorialogs/logsql/#pipes) joins items of the JSON array stored in `<src_field>` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) obtained from `<q>` [query](https://docs.victoriametrics.com/victorialogs/logsql/#query-syntax) results into a string using the given `delimiter`, and stores the result into `<result_field>`.

For example, the following query joins items of the `tags` JSON array using `,` as delimiter and stores the result back into the `tags` field:

```logsql
_time:5m | json_array_concat "," from tags
```

The `as <result_field>` part is optional. If it is missing, then the result is stored in the `<src_field>` specified in `from <src_field>`.
For example, the following query stores the result into a separate `tags_str` field:

```logsql
_time:5m | json_array_concat "," from tags as tags_str
```

The `from <src_field>` part is optional. If it is missing, then the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) is used as the source field.

The `delimiter` is optional. If it is missing, then items are concatenated without any separator.

If `<src_field>` does not contain a valid JSON array, the result is an empty string.

See also:

- [`split` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#split-pipe)
- [`json_array_len` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#json_array_len-pipe)
- [`unroll` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unroll-pipe)

### json_array_len pipe

`<q> | json_array_len(field) as result_field` calculates the length of JSON array at the given [`field`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
Expand Down Expand Up @@ -3467,6 +3497,7 @@ _time:5m | split "," as items | unroll items | top 5 (items)

See also:

- [`json_array_concat` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#json_array_concat-pipe)
- [`unroll` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unroll-pipe)
- [`unpack_words` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_words-pipe)

Expand Down
1 change: 1 addition & 0 deletions lib/logstorage/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func initPipeParsers() {
"generate_sequence": parsePipeGenerateSequence,
"hash": parsePipeHash,
"join": parsePipeJoin,
"json_array_concat": parsePipeJSONArrayConcat,
"json_array_len": parsePipeJSONArrayLen,
"head": parsePipeLimit,
"keep": parsePipeFields,
Expand Down
212 changes: 212 additions & 0 deletions lib/logstorage/pipe_json_array_concat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package logstorage

import (
"fmt"

"github.com/valyala/fastjson"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"

"github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter"
)

// pipeJSONArrayConcat processes '| json_array_concat ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#json_array_concat-pipe
type pipeJSONArrayConcat struct {
delimiter string
fromField string
resultField string
}

func (pc *pipeJSONArrayConcat) String() string {
s := "json_array_concat"
if pc.delimiter != "" {
s += " " + quoteTokenIfNeeded(pc.delimiter)
}
if !isMsgFieldName(pc.fromField) {
s += " from " + quoteTokenIfNeeded(pc.fromField)
}
if pc.resultField != pc.fromField {
s += " as " + quoteTokenIfNeeded(pc.resultField)
}
return s
}

func (pc *pipeJSONArrayConcat) splitToRemoteAndLocal(_ int64) (pipe, []pipe) {
return pc, nil
}

func (pc *pipeJSONArrayConcat) canLiveTail() bool {
return true
}

func (pc *pipeJSONArrayConcat) canReturnLastNResults() bool {
return pc.resultField != "_time"
}

func (pc *pipeJSONArrayConcat) isFixedOutputFieldsOrder() bool {
return false
}

func (pc *pipeJSONArrayConcat) updateNeededFields(pf *prefixfilter.Filter) {
if pf.MatchString(pc.resultField) {
pf.AddDenyFilter(pc.resultField)
pf.AddAllowFilter(pc.fromField)
}
}

func (pc *pipeJSONArrayConcat) hasFilterInWithQuery() bool {
return false
}

func (pc *pipeJSONArrayConcat) initFilterInValues(_ *inValuesCache, _ getFieldValuesFunc) (pipe, error) {
return pc, nil
}

func (pc *pipeJSONArrayConcat) visitSubqueries(_ func(q *Query)) {
// nothing to do
}

func (pc *pipeJSONArrayConcat) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
pcp := &pipeJSONArrayConcatProcessor{
pc: pc,
ppNext: ppNext,
}
pcp.shards.Init = func(shard *pipeJSONArrayConcatProcessorShard) {
shard.reset()
}
return pcp
}

type pipeJSONArrayConcatProcessor struct {
pc *pipeJSONArrayConcat
ppNext pipeProcessor

shards atomicutil.Slice[pipeJSONArrayConcatProcessorShard]
}

func (plp *pipeJSONArrayConcatProcessor) writeBlock(workerID uint, br *blockResult) {
if br.rowsLen == 0 {
return
}

shard := plp.shards.Get(workerID)
shard.rc.name = plp.pc.resultField

c := br.getColumnByName(plp.pc.fromField)
delimiter := plp.pc.delimiter
if c.isConst {
// Fast path for const column
v := c.valuesEncoded[0]
out := shard.concat(v, delimiter)
shard.rc.addValue(out)
br.addResultColumnConst(shard.rc)
} else {
// Slow path for other columns
values := c.getValues(br)
prevOut := ""
for rowIdx := range values {
if rowIdx == 0 || values[rowIdx] != values[rowIdx-1] {
prevOut = shard.concat(values[rowIdx], delimiter)
}
shard.rc.addValue(prevOut)
}
br.addResultColumn(shard.rc)
}

plp.ppNext.writeBlock(workerID, br)

shard.reset()
}

type pipeJSONArrayConcatProcessorShard struct {
a arena
rc resultColumn
}

func (shard *pipeJSONArrayConcatProcessorShard) reset() {
shard.a.reset()
shard.rc.reset()
}

func (shard *pipeJSONArrayConcatProcessorShard) concat(arrayStr, delimiter string) string {
if len(arrayStr) == 0 || arrayStr[0] != '[' {
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
return ""
}

p := jspp.Get()
defer jspp.Put(p)

jsv, err := p.Parse(arrayStr)
if err != nil {
return ""
}
jsa, err := jsv.Array()
if err != nil || len(jsa) == 0 {
return ""
}

bLen := len(shard.a.b)
for i, item := range jsa {
if i > 0 {
shard.a.b = append(shard.a.b, delimiter...)
}
if item.Type() == fastjson.TypeString {
sb, _ := item.StringBytes()
shard.a.b = append(shard.a.b, sb...)
} else {
shard.a.b = item.MarshalTo(shard.a.b)
}
}
return bytesutil.ToUnsafeString(shard.a.b[bLen:])
}

func (plp *pipeJSONArrayConcatProcessor) flush() error {
return nil
}

func parsePipeJSONArrayConcat(lex *lexer) (pipe, error) {
if !lex.isKeyword("json_array_concat") {
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "json_array_concat")
}
lex.nextToken()

delimiter := ""
if !lex.isKeyword("from", "as", "|", ")", "") {
s, err := lex.nextCompoundToken()
if err != nil {
return nil, fmt.Errorf("cannot parse delimiter for 'json_array_concat': %w", err)
}
delimiter = s
}

fromField := "_msg"
if lex.isKeyword("from") {
lex.nextToken()
f, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'from' field for 'json_array_concat': %w", err)
}
fromField = f
}

resultField := fromField
if lex.isKeyword("as") {
lex.nextToken()
}
if !lex.isKeyword("|", ")", "") {
f, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result field for 'json_array_concat': %w", err)
}
resultField = f
}

return &pipeJSONArrayConcat{
delimiter: delimiter,
fromField: fromField,
resultField: resultField,
}, nil
}
Loading