Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion app/vlinsert/opentelemetry/opentelemetry.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package opentelemetry

import (
"flag"
"fmt"
"net/http"
"time"
Expand All @@ -14,7 +15,10 @@ import (
"github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage"
)

var maxRequestSize = flagutil.NewBytes("opentelemetry.maxRequestSize", 64*1024*1024, "The maximum size in bytes of a single OpenTelemetry request")
var (
maxRequestSize = flagutil.NewBytes("opentelemetry.maxRequestSize", 64*1024*1024, "The maximum size in bytes of a single OpenTelemetry request")
enableFieldPrefixes = flag.Bool("opentelemetry.enableFieldPrefixes", false, "When enabled, prefixes OpenTelemetry fields by source to prevent name collisions: 'resource.*', 'attributes.*', 'body.*'. Disabled by default.")
)

// RequestHandler processes Opentelemetry insert requests
func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
Expand Down
147 changes: 147 additions & 0 deletions app/vlinsert/opentelemetry/opentelemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,153 @@ func TestPushProtobufRequest(t *testing.T) {
f(data, timestampsExpected, resultsExpected)
}

func TestPushProtobufRequestWithFieldPrefixes(t *testing.T) {
*enableFieldPrefixes = true
defer func() { *enableFieldPrefixes = false }()

f := func(src string, timestampsExpected []int64, resultExpected string) {
t.Helper()

var rls []resourceLogs
dec := json.NewDecoder(strings.NewReader(src))
dec.DisallowUnknownFields()
if err := dec.Decode(&rls); err != nil {
t.Fatalf("unexpected error when parsing JSON: %s", err)
}

lr := logsData{
ResourceLogs: rls,
}

pData := lr.marshalProtobuf(nil)
tlp := &insertutil.TestLogMessageProcessor{}
if err := pushProtobufRequest(pData, tlp, nil, false); err != nil {
t.Fatalf("unexpected error when parsing protobuf data: %s", err)
}

if err := tlp.Verify(timestampsExpected, resultExpected); err != nil {
t.Fatal(err)
}
}

// scalar body still arrives in _msg
data := `[{
"scopeLogs": [{
"logRecords": [{
"timeUnixNano": 1234,
"severityNumber": 9,
"body": {"stringValue": "hello world"}
}]
}]
}]`
timestampsExpected := []int64{1234}
resultsExpected := `{"_msg":"hello world","severity_number":"9","severity_text":"Info"}`
f(data, timestampsExpected, resultsExpected)

// resource attributes get "resource." prefix
data = `[{
"resource": {
"attributes": [
{"key":"service.name","value":{"stringValue":"mysvc"}},
{"key":"host","value":{"stringValue":"node1"}}
]
},
"scopeLogs": [{
"logRecords": [{
"timeUnixNano": 1234,
"severityNumber": 9,
"body": {"stringValue": "hello"}
}]
}]
}]`
timestampsExpected = []int64{1234}
resultsExpected = `{"resource.service.name":"mysvc","resource.host":"node1","_msg":"hello","severity_number":"9","severity_text":"Info"}`
f(data, timestampsExpected, resultsExpected)

// logRecord attributes get "attributes." prefix
data = `[{
"scopeLogs": [{
"logRecords": [{
"timeUnixNano": 1234,
"severityNumber": 9,
"body": {"stringValue": "hello"},
"attributes": [
{"key":"service.name","value":{"stringValue":"logsvc"}},
{"key":"env","value":{"stringValue":"prod"}}
]
}]
}]
}]`
timestampsExpected = []int64{1234}
resultsExpected = `{"_msg":"hello","attributes.service.name":"logsvc","attributes.env":"prod","severity_number":"9","severity_text":"Info"}`
f(data, timestampsExpected, resultsExpected)

// body KVList fields get "body." prefix
data = `[{
"scopeLogs": [{
"logRecords": [{
"timeUnixNano": 1234,
"severityNumber": 9,
"body": {
"keyValueList": {
"values": [
{"key":"foo","value":{"stringValue":"bar"}},
{"key":"count","value":{"intValue":42}}
]
}
}
}]
}]
}]`
timestampsExpected = []int64{1234}
resultsExpected = `{"body.foo":"bar","body.count":"42","severity_number":"9","severity_text":"Info"}`
f(data, timestampsExpected, resultsExpected)

// collision: same key in resource and logRecord attributes maps to distinct prefixed fields;
// generated fields such as trace_id, span_id, severity_number, and severity_text remain unprefixed
data = `[{
"resource": {
"attributes": [
{"key":"service.name","value":{"stringValue":"resource-svc"}}
]
},
"scopeLogs": [{
"logRecords": [{
"timeUnixNano": 1234,
"severityNumber": 9,
"body": {"stringValue": "msg"},
"attributes": [
{"key":"service.name","value":{"stringValue":"log-svc"}}
],
"traceID": "4bf92f3577b34da6a3ce929d0e0e4736",
"spanID": "00f067aa0ba902b7"
}]
}]
}]`
timestampsExpected = []int64{1234}
resultsExpected = `{"resource.service.name":"resource-svc","_msg":"msg","attributes.service.name":"log-svc","trace_id":"4bf92f3577b34da6a3ce929d0e0e4736","span_id":"00f067aa0ba902b7","severity_number":"9","severity_text":"Info"}`
f(data, timestampsExpected, resultsExpected)

// scope attributes keep their existing "scope.attributes." prefix regardless of the flag
data = `[{
"scopeLogs": [{
"scope": {
"name": "mylib",
"version": "v1",
"attributes": [{"key":"abc","value":{"stringValue":"xyz"}}]
},
"logRecords": [{
"timeUnixNano": 1234,
"severityNumber": 9,
"body": {"stringValue": "hello"}
}]
}]
}]`
timestampsExpected = []int64{1234}
resultsExpected = `{"scope.name":"mylib","scope.version":"v1","scope.attributes.abc":"xyz","_msg":"hello","severity_number":"9","severity_text":"Info"}`
f(data, timestampsExpected, resultsExpected)
}

var mp easyproto.MarshalerPool

// logsData represents the corresponding OTEL protobuf message.
Expand Down
24 changes: 18 additions & 6 deletions app/vlinsert/opentelemetry/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ func decodeResource(src []byte, fs *logstorage.Fields, fb *fmtBuffer) (err error
// repeated KeyValue attributes = 1;
// }

resourcePrefix := ""
if *enableFieldPrefixes {
resourcePrefix = "resource"
}

var fc easyproto.FieldContext
for len(src) > 0 {
src, err = fc.NextField(src)
Expand All @@ -119,7 +124,7 @@ func decodeResource(src []byte, fs *logstorage.Fields, fb *fmtBuffer) (err error
return fmt.Errorf("cannot read Attributes data")
}

if err := decodeKeyValue(data, fs, fb, ""); err != nil {
if err := decodeKeyValue(data, fs, fb, resourcePrefix); err != nil {
return fmt.Errorf("cannot decode Attributes: %w", err)
}
}
Expand Down Expand Up @@ -256,6 +261,13 @@ func decodeLogRecord(src []byte, fs *logstorage.Fields, fb *fmtBuffer) (string,
// string event_name = 12;
// }

bodyKeyValueFieldNamePrefix := ""
attributesPrefix := ""
if *enableFieldPrefixes {
bodyKeyValueFieldNamePrefix = "body"
attributesPrefix = "attributes"
}

var (
timeUnixNano uint64
observedTimeUnixNano uint64
Expand Down Expand Up @@ -298,15 +310,15 @@ func decodeLogRecord(src []byte, fs *logstorage.Fields, fb *fmtBuffer) (string,
if !ok {
return "", 0, fmt.Errorf("cannot read Body")
}
if err := decodeAnyValue(body, fs, fb, ""); err != nil {
if err := decodeAnyValue(body, fs, fb, "", bodyKeyValueFieldNamePrefix); err != nil {
return "", 0, fmt.Errorf("cannot decode Body: %w", err)
}
case 6:
attributesData, ok := fc.MessageData()
if !ok {
return "", 0, fmt.Errorf("cannot read Attributes data")
}
if err := decodeKeyValue(attributesData, fs, fb, ""); err != nil {
if err := decodeKeyValue(attributesData, fs, fb, attributesPrefix); err != nil {
return "", 0, fmt.Errorf("cannot decode Attributes: %w", err)
}
case 9:
Expand Down Expand Up @@ -380,14 +392,14 @@ func decodeKeyValue(src []byte, fs *logstorage.Fields, fb *fmtBuffer, fieldNameP
return nil
}

if err := decodeAnyValue(valueData, fs, fb, fieldName); err != nil {
if err := decodeAnyValue(valueData, fs, fb, fieldName, fieldName); err != nil {
return fmt.Errorf("cannot decode AnyValue: %w", err)
}

return nil
}

func decodeAnyValue(src []byte, fs *logstorage.Fields, fb *fmtBuffer, fieldName string) (err error) {
func decodeAnyValue(src []byte, fs *logstorage.Fields, fb *fmtBuffer, fieldName string, keyValueListPrefix string) (err error) {
// message AnyValue {
// oneof value {
// string string_value = 1;
Expand Down Expand Up @@ -456,7 +468,7 @@ func decodeAnyValue(src []byte, fs *logstorage.Fields, fb *fmtBuffer, fieldName
if !ok {
return fmt.Errorf("cannot read KeyValueList")
}
if err := decodeKeyValueList(data, fs, fb, fieldName); err != nil {
if err := decodeKeyValueList(data, fs, fb, keyValueListPrefix); err != nil {
return fmt.Errorf("cannot decode KeyValueList: %w", err)
}
case 7:
Expand Down
61 changes: 61 additions & 0 deletions docs/victorialogs/data-ingestion/opentelemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,67 @@ VictoriaLogs supports other HTTP headers - see the list [here](https://docs.vict

The ingested log entries can be queried according to [these docs](https://docs.victoriametrics.com/victorialogs/querying/).

## Field prefixes

By default, VictoriaLogs flattens all OpenTelemetry field sources into a single namespace.
This means the following sources can produce fields with identical names:

| Source | Example OTel path | Stored field name (default) |
|---|---|---|
| Resource attributes | `resource.attributes["service.name"]` | `service.name` |
| Log record attributes | `logRecord.attributes["service.name"]` | `service.name` |
| Log record body (KV list) | `logRecord.body["service.name"]` | `service.name` |
| Generated fields | `logRecord.traceID` | `trace_id` |

When two sources share a key, the log entry ends up with **duplicate field names**, which are
handled inconsistently across query and storage paths. This is especially noticeable when the
collision involves resource attributes, because those are used as
[stream fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) by default.

### Enabling prefixes

Start VictoriaLogs with the `-opentelemetry.enableFieldPrefixes` flag to add a source prefix
before each field is stored:

```sh
victoria-logs -opentelemetry.enableFieldPrefixes
```

With the flag enabled, each source writes to its own sub-namespace:

| Source | Example OTel path | Stored field name (with flag) |
|---|---|---|
| Resource attributes | `resource.attributes["service.name"]` | `resource.service.name` |
| Log record attributes | `logRecord.attributes["service.name"]` | `attributes.service.name` |
| Log record body (KV list) | `logRecord.body["service.name"]` | `body.service.name` |
| Generated fields | `logRecord.traceID` | `trace_id` *(unchanged)* |
| Scope attributes | `scope.attributes["abc"]` | `scope.attributes.abc` *(unchanged)* |

Scalar body values (string, int, bool, …) continue to arrive in the
[`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) field, the same as without the flag.

The flag is **disabled by default** to avoid breaking existing ingestion pipelines.
Enable it for new deployments, or for existing ones after updating any queries and dashboards
that reference the old unprefixed field names.

### Updating stream fields

When field prefixes are enabled, resource attributes are stored as `resource.*` fields.
Auto-selection of stream fields continues to work without changes — VictoriaLogs will
use the prefixed names automatically.

If you have an explicit `VL-Stream-Fields` header that references resource attribute names,
update those names to include the `resource.` prefix. For example:

```go
logExporter, err := otlploghttp.New(ctx,
otlploghttp.WithEndpointURL("http://victorialogs:9428/insert/opentelemetry/v1/logs"),
otlploghttp.WithHeaders(map[string]string{
"VL-Stream-Fields": "resource.host,resource.service.name",
}),
)
```

## Collector configuration

VictoriaLogs supports receiving logs from the following OpenTelemetry collectors:
Expand Down
2 changes: 2 additions & 0 deletions docs/victorialogs/victoria_logs_common_flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ See the docs at https://docs.victoriametrics.com/victorialogs/
-nativeinsert.maxRequestSize size
The maximum size in bytes of a single request, which can be accepted at /insert/native and /insert/multitenant/native HTTP endpoints
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
-opentelemetry.enableFieldPrefixes
When enabled, prefixes OpenTelemetry fields by source to prevent name collisions: 'resource.*', 'attributes.*', 'body.*'. Disabled by default.
-opentelemetry.maxRequestSize size
The maximum size in bytes of a single OpenTelemetry request
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)
Expand Down