diff --git a/app/vlinsert/opentelemetry/opentelemetry.go b/app/vlinsert/opentelemetry/opentelemetry.go index 308be5ca26..6bf282d8d0 100644 --- a/app/vlinsert/opentelemetry/opentelemetry.go +++ b/app/vlinsert/opentelemetry/opentelemetry.go @@ -1,6 +1,7 @@ package opentelemetry import ( + "flag" "fmt" "net/http" "time" @@ -14,7 +15,10 @@ import ( "github.com/VictoriaMetrics/VictoriaLogs/lib/logstorage" ) -var maxRequestSize = flagutil.NewBytes("opentelemetry.maxRequestSize", 64*1024*1024, "The maximum size in bytes of a single OpenTelemetry request") +var ( + maxRequestSize = flagutil.NewBytes("opentelemetry.maxRequestSize", 64*1024*1024, "The maximum size in bytes of a single OpenTelemetry request") + enableFieldPrefixes = flag.Bool("opentelemetry.enableFieldPrefixes", false, "When enabled, prefixes OpenTelemetry fields by source to prevent name collisions: 'resource.*', 'attributes.*', 'body.*'. Disabled by default.") +) // RequestHandler processes Opentelemetry insert requests func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { diff --git a/app/vlinsert/opentelemetry/opentelemetry_test.go b/app/vlinsert/opentelemetry/opentelemetry_test.go index 9c8813cb43..579d472fbf 100644 --- a/app/vlinsert/opentelemetry/opentelemetry_test.go +++ b/app/vlinsert/opentelemetry/opentelemetry_test.go @@ -376,6 +376,153 @@ func TestPushProtobufRequest(t *testing.T) { f(data, timestampsExpected, resultsExpected) } +func TestPushProtobufRequestWithFieldPrefixes(t *testing.T) { + *enableFieldPrefixes = true + defer func() { *enableFieldPrefixes = false }() + + f := func(src string, timestampsExpected []int64, resultExpected string) { + t.Helper() + + var rls []resourceLogs + dec := json.NewDecoder(strings.NewReader(src)) + dec.DisallowUnknownFields() + if err := dec.Decode(&rls); err != nil { + t.Fatalf("unexpected error when parsing JSON: %s", err) + } + + lr := logsData{ + ResourceLogs: rls, + } + + pData := lr.marshalProtobuf(nil) + tlp := &insertutil.TestLogMessageProcessor{} + if err := pushProtobufRequest(pData, tlp, nil, false); err != nil { + t.Fatalf("unexpected error when parsing protobuf data: %s", err) + } + + if err := tlp.Verify(timestampsExpected, resultExpected); err != nil { + t.Fatal(err) + } + } + + // scalar body still arrives in _msg + data := `[{ + "scopeLogs": [{ + "logRecords": [{ + "timeUnixNano": 1234, + "severityNumber": 9, + "body": {"stringValue": "hello world"} + }] + }] + }]` + timestampsExpected := []int64{1234} + resultsExpected := `{"_msg":"hello world","severity_number":"9","severity_text":"Info"}` + f(data, timestampsExpected, resultsExpected) + + // resource attributes get "resource." prefix + data = `[{ + "resource": { + "attributes": [ + {"key":"service.name","value":{"stringValue":"mysvc"}}, + {"key":"host","value":{"stringValue":"node1"}} + ] + }, + "scopeLogs": [{ + "logRecords": [{ + "timeUnixNano": 1234, + "severityNumber": 9, + "body": {"stringValue": "hello"} + }] + }] + }]` + timestampsExpected = []int64{1234} + resultsExpected = `{"resource.service.name":"mysvc","resource.host":"node1","_msg":"hello","severity_number":"9","severity_text":"Info"}` + f(data, timestampsExpected, resultsExpected) + + // logRecord attributes get "attributes." prefix + data = `[{ + "scopeLogs": [{ + "logRecords": [{ + "timeUnixNano": 1234, + "severityNumber": 9, + "body": {"stringValue": "hello"}, + "attributes": [ + {"key":"service.name","value":{"stringValue":"logsvc"}}, + {"key":"env","value":{"stringValue":"prod"}} + ] + }] + }] + }]` + timestampsExpected = []int64{1234} + resultsExpected = `{"_msg":"hello","attributes.service.name":"logsvc","attributes.env":"prod","severity_number":"9","severity_text":"Info"}` + f(data, timestampsExpected, resultsExpected) + + // body KVList fields get "body." prefix + data = `[{ + "scopeLogs": [{ + "logRecords": [{ + "timeUnixNano": 1234, + "severityNumber": 9, + "body": { + "keyValueList": { + "values": [ + {"key":"foo","value":{"stringValue":"bar"}}, + {"key":"count","value":{"intValue":42}} + ] + } + } + }] + }] + }]` + timestampsExpected = []int64{1234} + resultsExpected = `{"body.foo":"bar","body.count":"42","severity_number":"9","severity_text":"Info"}` + f(data, timestampsExpected, resultsExpected) + + // collision: same key in resource and logRecord attributes maps to distinct prefixed fields; + // generated fields such as trace_id, span_id, severity_number, and severity_text remain unprefixed + data = `[{ + "resource": { + "attributes": [ + {"key":"service.name","value":{"stringValue":"resource-svc"}} + ] + }, + "scopeLogs": [{ + "logRecords": [{ + "timeUnixNano": 1234, + "severityNumber": 9, + "body": {"stringValue": "msg"}, + "attributes": [ + {"key":"service.name","value":{"stringValue":"log-svc"}} + ], + "traceID": "4bf92f3577b34da6a3ce929d0e0e4736", + "spanID": "00f067aa0ba902b7" + }] + }] + }]` + timestampsExpected = []int64{1234} + resultsExpected = `{"resource.service.name":"resource-svc","_msg":"msg","attributes.service.name":"log-svc","trace_id":"4bf92f3577b34da6a3ce929d0e0e4736","span_id":"00f067aa0ba902b7","severity_number":"9","severity_text":"Info"}` + f(data, timestampsExpected, resultsExpected) + + // scope attributes keep their existing "scope.attributes." prefix regardless of the flag + data = `[{ + "scopeLogs": [{ + "scope": { + "name": "mylib", + "version": "v1", + "attributes": [{"key":"abc","value":{"stringValue":"xyz"}}] + }, + "logRecords": [{ + "timeUnixNano": 1234, + "severityNumber": 9, + "body": {"stringValue": "hello"} + }] + }] + }]` + timestampsExpected = []int64{1234} + resultsExpected = `{"scope.name":"mylib","scope.version":"v1","scope.attributes.abc":"xyz","_msg":"hello","severity_number":"9","severity_text":"Info"}` + f(data, timestampsExpected, resultsExpected) +} + var mp easyproto.MarshalerPool // logsData represents the corresponding OTEL protobuf message. diff --git a/app/vlinsert/opentelemetry/pb.go b/app/vlinsert/opentelemetry/pb.go index e85ceb5f20..522d3d4346 100644 --- a/app/vlinsert/opentelemetry/pb.go +++ b/app/vlinsert/opentelemetry/pb.go @@ -106,6 +106,11 @@ func decodeResource(src []byte, fs *logstorage.Fields, fb *fmtBuffer) (err error // repeated KeyValue attributes = 1; // } + resourcePrefix := "" + if *enableFieldPrefixes { + resourcePrefix = "resource" + } + var fc easyproto.FieldContext for len(src) > 0 { src, err = fc.NextField(src) @@ -119,7 +124,7 @@ func decodeResource(src []byte, fs *logstorage.Fields, fb *fmtBuffer) (err error return fmt.Errorf("cannot read Attributes data") } - if err := decodeKeyValue(data, fs, fb, ""); err != nil { + if err := decodeKeyValue(data, fs, fb, resourcePrefix); err != nil { return fmt.Errorf("cannot decode Attributes: %w", err) } } @@ -256,6 +261,13 @@ func decodeLogRecord(src []byte, fs *logstorage.Fields, fb *fmtBuffer) (string, // string event_name = 12; // } + bodyKeyValueFieldNamePrefix := "" + attributesPrefix := "" + if *enableFieldPrefixes { + bodyKeyValueFieldNamePrefix = "body" + attributesPrefix = "attributes" + } + var ( timeUnixNano uint64 observedTimeUnixNano uint64 @@ -298,7 +310,7 @@ func decodeLogRecord(src []byte, fs *logstorage.Fields, fb *fmtBuffer) (string, if !ok { return "", 0, fmt.Errorf("cannot read Body") } - if err := decodeAnyValue(body, fs, fb, ""); err != nil { + if err := decodeAnyValue(body, fs, fb, "", bodyKeyValueFieldNamePrefix); err != nil { return "", 0, fmt.Errorf("cannot decode Body: %w", err) } case 6: @@ -306,7 +318,7 @@ func decodeLogRecord(src []byte, fs *logstorage.Fields, fb *fmtBuffer) (string, if !ok { return "", 0, fmt.Errorf("cannot read Attributes data") } - if err := decodeKeyValue(attributesData, fs, fb, ""); err != nil { + if err := decodeKeyValue(attributesData, fs, fb, attributesPrefix); err != nil { return "", 0, fmt.Errorf("cannot decode Attributes: %w", err) } case 9: @@ -380,14 +392,14 @@ func decodeKeyValue(src []byte, fs *logstorage.Fields, fb *fmtBuffer, fieldNameP return nil } - if err := decodeAnyValue(valueData, fs, fb, fieldName); err != nil { + if err := decodeAnyValue(valueData, fs, fb, fieldName, fieldName); err != nil { return fmt.Errorf("cannot decode AnyValue: %w", err) } return nil } -func decodeAnyValue(src []byte, fs *logstorage.Fields, fb *fmtBuffer, fieldName string) (err error) { +func decodeAnyValue(src []byte, fs *logstorage.Fields, fb *fmtBuffer, fieldName string, keyValueListPrefix string) (err error) { // message AnyValue { // oneof value { // string string_value = 1; @@ -456,7 +468,7 @@ func decodeAnyValue(src []byte, fs *logstorage.Fields, fb *fmtBuffer, fieldName if !ok { return fmt.Errorf("cannot read KeyValueList") } - if err := decodeKeyValueList(data, fs, fb, fieldName); err != nil { + if err := decodeKeyValueList(data, fs, fb, keyValueListPrefix); err != nil { return fmt.Errorf("cannot decode KeyValueList: %w", err) } case 7: diff --git a/docs/victorialogs/data-ingestion/opentelemetry.md b/docs/victorialogs/data-ingestion/opentelemetry.md index 1cab5cd845..7c1e898765 100644 --- a/docs/victorialogs/data-ingestion/opentelemetry.md +++ b/docs/victorialogs/data-ingestion/opentelemetry.md @@ -43,6 +43,67 @@ VictoriaLogs supports other HTTP headers - see the list [here](https://docs.vict The ingested log entries can be queried according to [these docs](https://docs.victoriametrics.com/victorialogs/querying/). +## Field prefixes + +By default, VictoriaLogs flattens all OpenTelemetry field sources into a single namespace. +This means the following sources can produce fields with identical names: + +| Source | Example OTel path | Stored field name (default) | +|---|---|---| +| Resource attributes | `resource.attributes["service.name"]` | `service.name` | +| Log record attributes | `logRecord.attributes["service.name"]` | `service.name` | +| Log record body (KV list) | `logRecord.body["service.name"]` | `service.name` | +| Generated fields | `logRecord.traceID` | `trace_id` | + +When two sources share a key, the log entry ends up with **duplicate field names**, which are +handled inconsistently across query and storage paths. This is especially noticeable when the +collision involves resource attributes, because those are used as +[stream fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) by default. + +### Enabling prefixes + +Start VictoriaLogs with the `-opentelemetry.enableFieldPrefixes` flag to add a source prefix +before each field is stored: + +```sh +victoria-logs -opentelemetry.enableFieldPrefixes +``` + +With the flag enabled, each source writes to its own sub-namespace: + +| Source | Example OTel path | Stored field name (with flag) | +|---|---|---| +| Resource attributes | `resource.attributes["service.name"]` | `resource.service.name` | +| Log record attributes | `logRecord.attributes["service.name"]` | `attributes.service.name` | +| Log record body (KV list) | `logRecord.body["service.name"]` | `body.service.name` | +| Generated fields | `logRecord.traceID` | `trace_id` *(unchanged)* | +| Scope attributes | `scope.attributes["abc"]` | `scope.attributes.abc` *(unchanged)* | + +Scalar body values (string, int, bool, …) continue to arrive in the +[`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) field, the same as without the flag. + +The flag is **disabled by default** to avoid breaking existing ingestion pipelines. +Enable it for new deployments, or for existing ones after updating any queries and dashboards +that reference the old unprefixed field names. + +### Updating stream fields + +When field prefixes are enabled, resource attributes are stored as `resource.*` fields. +Auto-selection of stream fields continues to work without changes — VictoriaLogs will +use the prefixed names automatically. + +If you have an explicit `VL-Stream-Fields` header that references resource attribute names, +update those names to include the `resource.` prefix. For example: + +```go +logExporter, err := otlploghttp.New(ctx, + otlploghttp.WithEndpointURL("http://victorialogs:9428/insert/opentelemetry/v1/logs"), + otlploghttp.WithHeaders(map[string]string{ + "VL-Stream-Fields": "resource.host,resource.service.name", + }), +) +``` + ## Collector configuration VictoriaLogs supports receiving logs from the following OpenTelemetry collectors: diff --git a/docs/victorialogs/victoria_logs_common_flags.md b/docs/victorialogs/victoria_logs_common_flags.md index e555780482..8f2a7a7d0d 100644 --- a/docs/victorialogs/victoria_logs_common_flags.md +++ b/docs/victorialogs/victoria_logs_common_flags.md @@ -199,6 +199,8 @@ See the docs at https://docs.victoriametrics.com/victorialogs/ -nativeinsert.maxRequestSize size The maximum size in bytes of a single request, which can be accepted at /insert/native and /insert/multitenant/native HTTP endpoints Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864) + -opentelemetry.enableFieldPrefixes + When enabled, prefixes OpenTelemetry fields by source to prevent name collisions: 'resource.*', 'attributes.*', 'body.*'. Disabled by default. -opentelemetry.maxRequestSize size The maximum size in bytes of a single OpenTelemetry request Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 67108864)