Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/victorialogs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ according to the following docs:

* SECURITY: upgrade Go builder from Go1.26.2 to Go1.26.3. See [the list of issues addressed in Go1.26.3](https://github.com/golang/go/issues?q=milestone%3AGo1.26.3%20label%3ACherryPickApproved).

* FEATURE: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): add [`json_array_concat` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#json_array_concat-pipe) for joining JSON array items stored in the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into a string with the given delimiter. See [#712](https://github.com/VictoriaMetrics/VictoriaLogs/issues/712).
* FEATURE: [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/): add [`coalesce` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#coalesce-pipe), which returns the first non-empty value from the given list of fields. This is useful when the same value may be stored under different field names across logs, such as `user_id`, `username` or `email`. The pipe can also return a default value when all the fields are empty. See [#690](https://github.com/VictoriaMetrics/VictoriaLogs/issues/690). Thanks to @warkadiusz for [the pull request #904](https://github.com/VictoriaMetrics/VictoriaLogs/pull/904).
* FEATURE: [querying API](https://docs.victoriametrics.com/victorialogs/querying/): allow using [`limit`](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) and [`offset`](https://docs.victoriametrics.com/victorialogs/logsql/#offset-pipe) pipes after the [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) in queries to [`/select/logsql/stats_query`](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-stats). This enables the usage for these pipes in [alerting and recording rules for VictoriaLogs](https://docs.victoriametrics.com/victorialogs/vmalert/). See [#1296](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1296).
* FEATURE: [alerts](https://github.com/VictoriaMetrics/VictoriaLogs/blob/master/deployment/docker/rules): add new alerting rules `PersistentQueueRunsOutOfSpaceIn12Hours` and `PersistentQueueRunsOutOfSpaceIn4Hours` for `vlagent` persistent queue capacity. These alerts help users to take proactive actions before `vlagent` starts dropping logs due to insufficient persistent queue space. See [#10193](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10193)
Expand Down
31 changes: 31 additions & 0 deletions docs/victorialogs/logsql.md
Original file line number Diff line number Diff line change
Expand Up @@ -1851,6 +1851,7 @@ LogsQL supports the following pipes:
- [`format`](https://docs.victoriametrics.com/victorialogs/logsql/#format-pipe) formats output field from input [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`generate_sequence`](https://docs.victoriametrics.com/victorialogs/logsql/#generate_sequence-pipe) generates output logs with messages containing integer sequence.
- [`join`](https://docs.victoriametrics.com/victorialogs/logsql/#join-pipe) joins query results by the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`json_array_concat`](https://docs.victoriametrics.com/victorialogs/logsql/#json_array_concat-pipe) joins JSON array items stored at the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into a string using the given delimiter.
- [`json_array_len`](https://docs.victoriametrics.com/victorialogs/logsql/#json_array_len-pipe) returns the length of JSON array stored
at the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`hash`](https://docs.victoriametrics.com/victorialogs/logsql/#hash-pipe) returns the hash over the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) value.
Expand Down Expand Up @@ -2727,6 +2728,35 @@ See also:
- [adding static logs](https://docs.victoriametrics.com/victorialogs/logsql/#adding-static-logs)


### json_array_concat pipe

The `<q> | json_array_concat [delimiter] [from <src_field>] [as <result_field>]` [pipe](https://docs.victoriametrics.com/victorialogs/logsql/#pipes) joins items of the JSON array stored in `<src_field>` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) obtained from `<q>` [query](https://docs.victoriametrics.com/victorialogs/logsql/#query-syntax) results into a string using the given `delimiter`, and stores the result into `<result_field>`.

For example, the following query joins items of the `tags` JSON array using `,` as delimiter and stores the result back into the `tags` field:

```logsql
_time:5m | json_array_concat "," from tags
```

The `as <result_field>` part is optional. If it is missing, then the result is stored in the `<src_field>` specified in `from <src_field>`.
For example, the following query stores the result into a separate `tags_str` field:

```logsql
_time:5m | json_array_concat "," from tags as tags_str
```

The `from <src_field>` part is optional. If it is missing, then the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) is used as the source field.

The `delimiter` is optional. If it is missing, then items are concatenated without any separator.

If `<src_field>` does not contain a valid JSON array, the result is an empty string.

See also:

- [`split` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#split-pipe)
- [`json_array_len` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#json_array_len-pipe)
- [`unroll` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unroll-pipe)

### json_array_len pipe

`<q> | json_array_len(field) as result_field` calculates the length of JSON array at the given [`field`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
Expand Down Expand Up @@ -3467,6 +3497,7 @@ _time:5m | split "," as items | unroll items | top 5 (items)

See also:

- [`json_array_concat` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#json_array_concat-pipe)
- [`unroll` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unroll-pipe)
- [`unpack_words` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_words-pipe)

Expand Down
1 change: 1 addition & 0 deletions lib/logstorage/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func initPipeParsers() {
"generate_sequence": parsePipeGenerateSequence,
"hash": parsePipeHash,
"join": parsePipeJoin,
"json_array_concat": parsePipeJSONArrayConcat,
"json_array_len": parsePipeJSONArrayLen,
"head": parsePipeLimit,
"keep": parsePipeFields,
Expand Down
213 changes: 213 additions & 0 deletions lib/logstorage/pipe_json_array_concat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package logstorage

import (
"fmt"

"github.com/valyala/fastjson"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"

"github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter"
)

// pipeJSONArrayConcat processes '| json_array_concat ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#json_array_concat-pipe
type pipeJSONArrayConcat struct {
delimiter string
fromField string
resultField string
}

func (pc *pipeJSONArrayConcat) String() string {
s := "json_array_concat"
if pc.delimiter != "" {
s += " " + quoteTokenIfNeeded(pc.delimiter)
}
if !isMsgFieldName(pc.fromField) {
s += " from " + quoteTokenIfNeeded(pc.fromField)
}
if pc.resultField != pc.fromField {
s += " as " + quoteTokenIfNeeded(pc.resultField)
}
return s
}

func (pc *pipeJSONArrayConcat) splitToRemoteAndLocal(_ int64) (pipe, []pipe) {
return pc, nil
}

func (pc *pipeJSONArrayConcat) canLiveTail() bool {
return true
}

func (pc *pipeJSONArrayConcat) canReturnLastNResults() bool {
return pc.resultField != "_time"
}

func (pc *pipeJSONArrayConcat) isFixedOutputFieldsOrder() bool {
return false
}

func (pc *pipeJSONArrayConcat) updateNeededFields(pf *prefixfilter.Filter) {
if pf.MatchString(pc.resultField) {
pf.AddDenyFilter(pc.resultField)
pf.AddAllowFilter(pc.fromField)
}
}

func (pc *pipeJSONArrayConcat) hasFilterInWithQuery() bool {
return false
}

func (pc *pipeJSONArrayConcat) initFilterInValues(_ *inValuesCache, _ getFieldValuesFunc) (pipe, error) {
return pc, nil
}

func (pc *pipeJSONArrayConcat) visitSubqueries(_ func(q *Query)) {
// nothing to do
}

func (pc *pipeJSONArrayConcat) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
pcp := &pipeJSONArrayConcatProcessor{
pc: pc,
ppNext: ppNext,
}
pcp.shards.Init = func(shard *pipeJSONArrayConcatProcessorShard) {
shard.reset()
}
return pcp
}

type pipeJSONArrayConcatProcessor struct {
pc *pipeJSONArrayConcat
ppNext pipeProcessor

shards atomicutil.Slice[pipeJSONArrayConcatProcessorShard]
}

func (plp *pipeJSONArrayConcatProcessor) writeBlock(workerID uint, br *blockResult) {
if br.rowsLen == 0 {
return
}

shard := plp.shards.Get(workerID)
shard.rc.name = plp.pc.resultField

c := br.getColumnByName(plp.pc.fromField)
delimiter := plp.pc.delimiter
if c.isConst {
// Fast path for const column
v := c.valuesEncoded[0]
out := shard.concat(v, delimiter)
shard.rc.addValue(out)
br.addResultColumnConst(shard.rc)
} else {
// Slow path for other columns
values := c.getValues(br)
prevOut := ""
for rowIdx := range values {
if rowIdx == 0 || values[rowIdx] != values[rowIdx-1] {
prevOut = shard.concat(values[rowIdx], delimiter)
}
shard.rc.addValue(prevOut)
}
br.addResultColumn(shard.rc)
}

plp.ppNext.writeBlock(workerID, br)

shard.reset()
}

type pipeJSONArrayConcatProcessorShard struct {
a arena
rc resultColumn
}

func (shard *pipeJSONArrayConcatProcessorShard) reset() {
shard.a.reset()
shard.rc.reset()
}

func (shard *pipeJSONArrayConcatProcessorShard) concat(arrayStr, delimiter string) string {
arrayStr = trimJSONWhitespace(arrayStr)
if len(arrayStr) == 0 || arrayStr[0] != '[' {
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
return ""
}

p := jspp.Get()
defer jspp.Put(p)

jsv, err := p.Parse(arrayStr)
if err != nil {
return ""
}
jsa, err := jsv.Array()
if err != nil || len(jsa) == 0 {
return ""
}

bLen := len(shard.a.b)
for i, item := range jsa {
if i > 0 {
shard.a.b = append(shard.a.b, delimiter...)
}
if item.Type() == fastjson.TypeString {
sb, _ := item.StringBytes()
shard.a.b = append(shard.a.b, sb...)
} else {
shard.a.b = item.MarshalTo(shard.a.b)
}
}
return bytesutil.ToUnsafeString(shard.a.b[bLen:])
}

func (plp *pipeJSONArrayConcatProcessor) flush() error {
return nil
}

func parsePipeJSONArrayConcat(lex *lexer) (pipe, error) {
if !lex.isKeyword("json_array_concat") {
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "json_array_concat")
}
lex.nextToken()

delimiter := ""
if !lex.isKeyword("from", "as", "|", ")", "") {
s, err := lex.nextCompoundToken()
if err != nil {
return nil, fmt.Errorf("cannot parse delimiter for 'json_array_concat': %w", err)
}
delimiter = s
}

fromField := "_msg"
if lex.isKeyword("from") {
lex.nextToken()
f, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'from' field for 'json_array_concat': %w", err)
}
fromField = f
}

resultField := fromField
if lex.isKeyword("as") {
lex.nextToken()
}
if !lex.isKeyword("|", ")", "") {
f, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result field for 'json_array_concat': %w", err)
}
resultField = f
}

return &pipeJSONArrayConcat{
delimiter: delimiter,
fromField: fromField,
resultField: resultField,
}, nil
}
Loading