diff --git a/trace/config.go b/trace/config.go index d5473a798..8ac8fae3e 100644 --- a/trace/config.go +++ b/trace/config.go @@ -16,6 +16,18 @@ package trace import "go.opencensus.io/trace/internal" +// Default limits for the number of attributes, message events and links on each span +// in order to prevent unbounded memory increase for long-running spans. +// These defaults can be overriden with trace.ApplyConfig. +// These defaults can also be overriden per-span by using trace.StartOptions +// when creating a new span. +// TODO: Add an annnoation limit when priorities are implemented. +const ( + DefaultMaxAttributes = 32 + DefaultMaxMessageEvents = 128 + DefaultMaxLinks = 32 +) + // Config represents the global tracing configuration. type Config struct { // DefaultSampler is the default sampler used when creating new spans. @@ -23,12 +35,44 @@ type Config struct { // IDGenerator is for internal use only. IDGenerator internal.IDGenerator + + // The below config options must be set with a GlobalOption. + // maxAttributes sets a global limit on the number of attributes. + maxAttributes int + // maxMessageEvents sets a global limit on the number of message events. + maxMessageEvents int + // maxLinks sets a global limit on the number of links. + maxLinks int +} + +// GlobalOption apply changes to global tracing configuration. +type GlobalOption func(*Config) + +// WithDefaultMaxAttributes sets the default limit on the number of attributes. +func WithDefaultMaxAttributes(limit int) GlobalOption { + return func(c *Config) { + c.maxAttributes = limit + } +} + +// WithDefaultMaxMessageEvents sets the default limit on the number of message events. +func WithDefaultMaxMessageEvents(limit int) GlobalOption { + return func(c *Config) { + c.maxMessageEvents = limit + } +} + +// WithDefaultMaxLinks sets the default limit on the number of links. +func WithDefaultMaxLinks(limit int) GlobalOption { + return func(c *Config) { + c.maxLinks = limit + } } // ApplyConfig applies changes to the global tracing configuration. // // Fields not provided in the given config are going to be preserved. -func ApplyConfig(cfg Config) { +func ApplyConfig(cfg Config, o ...GlobalOption) { c := *config.Load().(*Config) if cfg.DefaultSampler != nil { c.DefaultSampler = cfg.DefaultSampler @@ -36,5 +80,8 @@ func ApplyConfig(cfg Config) { if cfg.IDGenerator != nil { c.IDGenerator = cfg.IDGenerator } + for _, op := range o { + op(&c) + } config.Store(&c) } diff --git a/trace/export.go b/trace/export.go index c522550fa..4aaf59f0b 100644 --- a/trace/export.go +++ b/trace/export.go @@ -73,4 +73,13 @@ type SpanData struct { Status Links []Link HasRemoteParent bool + + // TODO: Record these drops with stats.Record / stats.Int64. + + // DroppedAttributes contains the number of dropped attributes in this span. + DroppedAttributes int + // DroppedMessageEvents contains the number of dropped message events in this span. + DroppedMessageEvents int + // DroppedLinks contains the number of dropped links in this span. + DroppedLinks int } diff --git a/trace/trace.go b/trace/trace.go index 3e640b745..d155b91a2 100644 --- a/trace/trace.go +++ b/trace/trace.go @@ -46,6 +46,12 @@ type Span struct { endOnce sync.Once executionTracerTaskEnd func() // ends the execution tracer span + + // The maximum limits for internal span data. + // These are set with trace.ApplyConfig and can be overriden by trace.StartOptions. + maxAttributes int + maxMessageEvents int + maxLinks int } // IsRecordingEvents returns true if events are being recorded for this span. @@ -125,6 +131,16 @@ type StartOptions struct { // SpanKind represents the kind of a span. If none is set, // SpanKindUnspecified is used. SpanKind int + + // MaxAttributes sets a span limit on the number of attributes (overrides + // global trace config). + MaxAttributes int + // WithMaxMessageEvents sets a span limit on the number of message events + // (overrides global trace config). + MaxMessageEvents int + // WithMaxLinks sets a span limit on the number of links (overrides + // global trace config). + MaxLinks int } // StartOption apply changes to StartOptions. @@ -145,20 +161,37 @@ func WithSampler(sampler Sampler) StartOption { } } +// WithMaxAttributes sets a span limit on the number of attributes (overrides global trace config). +func WithMaxAttributes(max int) StartOption { + return func(o *StartOptions) { + o.MaxAttributes = max + } +} + +// WithMaxMessageEvents sets a span limit on the number of message events (overrides global trace config). +func WithMaxMessageEvents(max int) StartOption { + return func(o *StartOptions) { + o.MaxMessageEvents = max + } +} + +// WithMaxLinks sets a span limit on the number of links (overrides global trace config). +func WithMaxLinks(max int) StartOption { + return func(o *StartOptions) { + o.MaxLinks = max + } +} + // StartSpan starts a new child span of the current span in the context. If // there is no span in the context, creates a new trace and span. // // Returned context contains the newly created span. You can use it to // propagate the returned span in process. -func StartSpan(ctx context.Context, name string, o ...StartOption) (context.Context, *Span) { - var opts StartOptions +func StartSpan(ctx context.Context, name string, opts ...StartOption) (context.Context, *Span) { var parent SpanContext if p := FromContext(ctx); p != nil { parent = p.spanContext } - for _, op := range o { - op(&opts) - } span := startSpanInternal(name, parent != SpanContext{}, parent, false, opts) ctx, end := startExecutionTracerTask(ctx, name) @@ -173,23 +206,34 @@ func StartSpan(ctx context.Context, name string, o ...StartOption) (context.Cont // // Returned context contains the newly created span. You can use it to // propagate the returned span in process. -func StartSpanWithRemoteParent(ctx context.Context, name string, parent SpanContext, o ...StartOption) (context.Context, *Span) { - var opts StartOptions - for _, op := range o { - op(&opts) - } +func StartSpanWithRemoteParent(ctx context.Context, name string, parent SpanContext, opts ...StartOption) (context.Context, *Span) { span := startSpanInternal(name, parent != SpanContext{}, parent, true, opts) ctx, end := startExecutionTracerTask(ctx, name) span.executionTracerTaskEnd = end return NewContext(ctx, span), span } -func startSpanInternal(name string, hasParent bool, parent SpanContext, remoteParent bool, o StartOptions) *Span { - span := &Span{} - span.spanContext = parent - +func startSpanInternal(name string, hasParent bool, parent SpanContext, remoteParent bool, opts []StartOption) *Span { cfg := config.Load().(*Config) + // Load the global config default limits, which can be modified with trace.ApplyConfig. + o := StartOptions{ + MaxAttributes: cfg.maxAttributes, + MaxMessageEvents: cfg.maxMessageEvents, + MaxLinks: cfg.maxLinks, + } + // Now overlay any StartOption overrides (e.g. per-span limits). + for _, op := range opts { + op(&o) + } + + span := &Span{ + spanContext: parent, + maxAttributes: o.MaxAttributes, + maxLinks: o.MaxLinks, + maxMessageEvents: o.MaxMessageEvents, + } + if !hasParent { span.spanContext.TraceID = cfg.IDGenerator.NewTraceID() } @@ -313,22 +357,34 @@ func (s *Span) SetStatus(status Status) { // // Existing attributes whose keys appear in the attributes parameter are overwritten. func (s *Span) AddAttributes(attributes ...Attribute) { - if !s.IsRecordingEvents() { + if !s.IsRecordingEvents() || s.maxAttributes <= 0 { return } s.mu.Lock() if s.data.Attributes == nil { s.data.Attributes = make(map[string]interface{}) } - copyAttributes(s.data.Attributes, attributes) + if drops := copyAttributes(s.data.Attributes, attributes, s.maxAttributes); drops > 0 { + s.data.DroppedAttributes += drops + } s.mu.Unlock() } // copyAttributes copies a slice of Attributes into a map. -func copyAttributes(m map[string]interface{}, attributes []Attribute) { +// It returns how many were dropped (after hitting the given max limit). +func copyAttributes(m map[string]interface{}, attributes []Attribute, max int) int { + var drops int for _, a := range attributes { + if len(m) >= max { + if _, ok := m[a.key]; !ok { + // If the attribute map hit max capacity, only allow existing key updates. + drops++ + continue + } + } m[a.key] = a.value } + return drops } func (s *Span) lazyPrintfInternal(attributes []Attribute, format string, a ...interface{}) { @@ -336,9 +392,9 @@ func (s *Span) lazyPrintfInternal(attributes []Attribute, format string, a ...in msg := fmt.Sprintf(format, a...) var m map[string]interface{} s.mu.Lock() - if len(attributes) != 0 { + if len(attributes) != 0 && s.maxAttributes > 0 { m = make(map[string]interface{}) - copyAttributes(m, attributes) + copyAttributes(m, attributes, s.maxAttributes) } s.data.Annotations = append(s.data.Annotations, Annotation{ Time: now, @@ -352,9 +408,9 @@ func (s *Span) printStringInternal(attributes []Attribute, str string) { now := time.Now() var a map[string]interface{} s.mu.Lock() - if len(attributes) != 0 { + if len(attributes) != 0 && s.maxAttributes > 0 { a = make(map[string]interface{}) - copyAttributes(a, attributes) + copyAttributes(a, attributes, s.maxAttributes) } s.data.Annotations = append(s.data.Annotations, Annotation{ Time: now, @@ -388,11 +444,15 @@ func (s *Span) Annotatef(attributes []Attribute, format string, a ...interface{} // event (this allows to identify a message between the sender and receiver). // For example, this could be a sequence id. func (s *Span) AddMessageSendEvent(messageID, uncompressedByteSize, compressedByteSize int64) { - if !s.IsRecordingEvents() { + if !s.IsRecordingEvents() || s.maxMessageEvents <= 0 { return } now := time.Now() s.mu.Lock() + if l := len(s.data.MessageEvents); l > 0 && l >= s.maxMessageEvents { + s.data.MessageEvents = s.data.MessageEvents[1:] + s.data.DroppedMessageEvents++ + } s.data.MessageEvents = append(s.data.MessageEvents, MessageEvent{ Time: now, EventType: MessageEventTypeSent, @@ -410,11 +470,15 @@ func (s *Span) AddMessageSendEvent(messageID, uncompressedByteSize, compressedBy // event (this allows to identify a message between the sender and receiver). // For example, this could be a sequence id. func (s *Span) AddMessageReceiveEvent(messageID, uncompressedByteSize, compressedByteSize int64) { - if !s.IsRecordingEvents() { + if !s.IsRecordingEvents() || s.maxMessageEvents <= 0 { return } now := time.Now() s.mu.Lock() + if l := len(s.data.MessageEvents); l > 0 && l >= s.maxMessageEvents { + s.data.MessageEvents = s.data.MessageEvents[1:] + s.data.DroppedMessageEvents++ + } s.data.MessageEvents = append(s.data.MessageEvents, MessageEvent{ Time: now, EventType: MessageEventTypeRecv, @@ -427,10 +491,14 @@ func (s *Span) AddMessageReceiveEvent(messageID, uncompressedByteSize, compresse // AddLink adds a link to the span. func (s *Span) AddLink(l Link) { - if !s.IsRecordingEvents() { + if !s.IsRecordingEvents() || s.maxLinks <= 0 { return } s.mu.Lock() + if l := len(s.data.Links); l > 0 && l >= s.maxLinks { + s.data.Links = s.data.Links[1:] + s.data.DroppedLinks++ + } s.data.Links = append(s.data.Links, l) s.mu.Unlock() } @@ -463,8 +531,11 @@ func init() { gen.spanIDInc |= 1 config.Store(&Config{ - DefaultSampler: ProbabilitySampler(defaultSamplingProbability), - IDGenerator: gen, + DefaultSampler: ProbabilitySampler(defaultSamplingProbability), + IDGenerator: gen, + maxAttributes: DefaultMaxAttributes, + maxMessageEvents: DefaultMaxMessageEvents, + maxLinks: DefaultMaxLinks, }) } diff --git a/trace/trace_test.go b/trace/trace_test.go index 234531b97..13805e361 100644 --- a/trace/trace_test.go +++ b/trace/trace_test.go @@ -27,9 +27,18 @@ var ( sid = SpanID{1, 2, 4, 8, 16, 32, 64, 128} ) -func init() { +func resetDefaultTestConfig() { // no random sampling, but sample children of sampled spans. - ApplyConfig(Config{DefaultSampler: ProbabilitySampler(0)}) + ApplyConfig( + Config{DefaultSampler: ProbabilitySampler(0)}, + WithDefaultMaxAttributes(DefaultMaxAttributes), + WithDefaultMaxLinks(DefaultMaxLinks), + WithDefaultMaxMessageEvents(DefaultMaxMessageEvents), + ) +} + +func init() { + resetDefaultTestConfig() } func TestStrings(t *testing.T) { @@ -80,6 +89,69 @@ func TestStartSpan(t *testing.T) { } } +func TestSpanStartOptionOverrides(t *testing.T) { + for _, test := range []struct { + name string + globalOpts []GlobalOption + startOpts []StartOption + wantMaxAttributes int + wantMaxMessageEvents int + wantMaxLinks int + }{ + { + name: "all defaults", + globalOpts: nil, + startOpts: nil, + wantMaxAttributes: DefaultMaxAttributes, + wantMaxMessageEvents: DefaultMaxMessageEvents, + wantMaxLinks: DefaultMaxLinks, + }, + { + name: "all global opts", + globalOpts: []GlobalOption{WithDefaultMaxAttributes(1), WithDefaultMaxMessageEvents(2), WithDefaultMaxLinks(3)}, + startOpts: nil, + wantMaxAttributes: 1, + wantMaxMessageEvents: 2, + wantMaxLinks: 3, + }, + { + name: "all start/span override opts", + globalOpts: nil, + startOpts: []StartOption{WithMaxAttributes(4), WithMaxMessageEvents(5), WithMaxLinks(6)}, + wantMaxAttributes: 4, + wantMaxMessageEvents: 5, + wantMaxLinks: 6, + }, + { + name: "mixed (1 default, 1 global, 1 start/span override)", + globalOpts: []GlobalOption{WithDefaultMaxMessageEvents(7)}, + startOpts: []StartOption{WithMaxLinks(8)}, + wantMaxAttributes: DefaultMaxAttributes, + wantMaxMessageEvents: 7, + wantMaxLinks: 8, + }, + } { + t.Run(test.name, func(t *testing.T) { + ApplyConfig(Config{DefaultSampler: ProbabilitySampler(0)}, test.globalOpts...) + defer resetDefaultTestConfig() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, span := StartSpan(ctx, "foo", test.startOpts...) + if span.maxAttributes != test.wantMaxAttributes { + t.Errorf("got %d attributes, want %d", span.maxAttributes, test.wantMaxAttributes) + } + if span.maxMessageEvents != test.wantMaxMessageEvents { + t.Errorf("got %d message events, want %d", span.maxMessageEvents, test.wantMaxMessageEvents) + } + if span.maxLinks != test.wantMaxLinks { + t.Errorf("got %d links, want %d", span.maxLinks, test.wantMaxLinks) + } + }) + } +} + func TestSampling(t *testing.T) { for _, test := range []struct { remoteParent bool @@ -168,7 +240,7 @@ func TestSampling(t *testing.T) { } } } - ApplyConfig(Config{DefaultSampler: ProbabilitySampler(0)}) // reset the default sampler. + resetDefaultTestConfig() } func TestProbabilitySampler(t *testing.T) { @@ -355,8 +427,16 @@ func TestSpanKind(t *testing.T) { } func TestSetSpanAttributes(t *testing.T) { + ApplyConfig(Config{DefaultSampler: ProbabilitySampler(0)}, WithDefaultMaxAttributes(2)) + defer resetDefaultTestConfig() + span := startSpan(StartOptions{}) span.AddAttributes(StringAttribute("key1", "value1")) + span.AddAttributes(StringAttribute("key2", "value2")) + span.AddAttributes(StringAttribute("dropped1", "dropped1")) + span.AddAttributes(StringAttribute("dropped2", "dropped2")) + span.AddAttributes(StringAttribute("key1", "updated_value1")) + span.AddAttributes(StringAttribute("key2", "updated_value2")) got, err := endSpan(span) if err != nil { t.Fatal(err) @@ -368,10 +448,11 @@ func TestSetSpanAttributes(t *testing.T) { SpanID: SpanID{}, TraceOptions: 0x1, }, - ParentSpanID: sid, - Name: "span0", - Attributes: map[string]interface{}{"key1": "value1"}, - HasRemoteParent: true, + ParentSpanID: sid, + Name: "span0", + Attributes: map[string]interface{}{"key1": "updated_value1", "key2": "updated_value2"}, + HasRemoteParent: true, + DroppedAttributes: 2, } if !reflect.DeepEqual(got, want) { t.Errorf("exporting span: got %#v want %#v", got, want) @@ -413,7 +494,13 @@ func TestAnnotations(t *testing.T) { } func TestMessageEvents(t *testing.T) { + ApplyConfig(Config{DefaultSampler: ProbabilitySampler(0)}, WithDefaultMaxMessageEvents(2)) + defer resetDefaultTestConfig() + span := startSpan(StartOptions{}) + const dropped1, dropped2 = 123, 456 + span.AddMessageReceiveEvent(dropped1, dropped1, dropped1) + span.AddMessageSendEvent(dropped2, dropped2, dropped2) span.AddMessageReceiveEvent(3, 400, 300) span.AddMessageSendEvent(1, 200, 100) got, err := endSpan(span) @@ -439,7 +526,8 @@ func TestMessageEvents(t *testing.T) { {EventType: 2, MessageID: 0x3, UncompressedByteSize: 0x190, CompressedByteSize: 0x12c}, {EventType: 1, MessageID: 0x1, UncompressedByteSize: 0xc8, CompressedByteSize: 0x64}, }, - HasRemoteParent: true, + HasRemoteParent: true, + DroppedMessageEvents: 2, } if !reflect.DeepEqual(got, want) { t.Errorf("exporting span: got %#v want %#v", got, want) @@ -522,13 +610,34 @@ func TestSetSpanStatus(t *testing.T) { } func TestAddLink(t *testing.T) { + ApplyConfig(Config{DefaultSampler: ProbabilitySampler(0)}, WithDefaultMaxLinks(2)) + defer resetDefaultTestConfig() + span := startSpan(StartOptions{}) + span.AddLink(Link{ + TraceID: tid, + SpanID: sid, + Type: LinkTypeParent, + Attributes: map[string]interface{}{"dropped1": "dropped1"}, + }) + span.AddLink(Link{ + TraceID: tid, + SpanID: sid, + Type: LinkTypeParent, + Attributes: map[string]interface{}{"dropped2": "dropped2"}, + }) span.AddLink(Link{ TraceID: tid, SpanID: sid, Type: LinkTypeParent, Attributes: map[string]interface{}{"key5": "value5"}, }) + span.AddLink(Link{ + TraceID: tid, + SpanID: sid, + Type: LinkTypeParent, + Attributes: map[string]interface{}{"key6": "value6"}, + }) got, err := endSpan(span) if err != nil { t.Fatal(err) @@ -542,13 +651,22 @@ func TestAddLink(t *testing.T) { }, ParentSpanID: sid, Name: "span0", - Links: []Link{{ - TraceID: tid, - SpanID: sid, - Type: 2, - Attributes: map[string]interface{}{"key5": "value5"}, - }}, + Links: []Link{ + { + TraceID: tid, + SpanID: sid, + Type: 2, + Attributes: map[string]interface{}{"key5": "value5"}, + }, + { + TraceID: tid, + SpanID: sid, + Type: 2, + Attributes: map[string]interface{}{"key6": "value6"}, + }, + }, HasRemoteParent: true, + DroppedLinks: 2, } if !reflect.DeepEqual(got, want) { t.Errorf("exporting span: got %#v want %#v", got, want)