Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/pipeline/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

package pipeline

import "sync"
import (
"sync"
)

// AsyncControl is an asynchronous execution control that can be canceled.
type AsyncControl struct {
Expand Down
8 changes: 7 additions & 1 deletion pluginmanager/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,17 @@ var containerConfigJSON = `{
]
}`

func panicRecover(pluginType string) {
func panicRecover(pluginType string, callBacks ...func()) {
if err := recover(); err != nil {
trace := make([]byte, 2048)
runtime.Stack(trace, true)
logger.Error(context.Background(), "PLUGIN_RUNTIME_ALARM", "plugin", pluginType, "panicked", err, "stack", string(trace))

for _, cb := range callBacks {
if cb != nil {
cb()
}
}
}
}

Expand Down
20 changes: 18 additions & 2 deletions pluginmanager/plugin_runner_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,31 @@ import (
type timerRunner struct {
initialMaxDelay time.Duration
interval time.Duration
rerunIfPanic bool
context pipeline.Context
state interface{}
}

func (p *timerRunner) Run(task func(state interface{}) error, cc *pipeline.AsyncControl) {
logger.Info(p.context.GetRuntimeContext(), "task run", "start", "interval", p.interval, "max delay", p.initialMaxDelay, "state", fmt.Sprintf("%T", p.state))
defer panicRecover(fmt.Sprint(p.state))

exitFlag := false

panicCallback := func() {
if exitFlag {
logger.Infof(p.context.GetRuntimeContext(), "task exited by async control")
return
}

if p.rerunIfPanic {
logger.Warningf(p.context.GetRuntimeContext(), "TASK_PANIC_RECOVER", "task run panic, rerun after %v", p.initialMaxDelay)
cc.Run(func(cc *pipeline.AsyncControl) {
p.Run(task, cc)
})
return
}
}
defer panicRecover(fmt.Sprint(p.state), panicCallback)

if p.initialMaxDelay > 0 {
if p.initialMaxDelay > p.interval {
logger.Infof(p.context.GetRuntimeContext(), "initial collect delay is larger than than interval, use interval %v instead", p.interval)
Expand Down
23 changes: 23 additions & 0 deletions pluginmanager/plugin_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package pluginmanager

import (
"context"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -99,3 +100,25 @@ func (s *pluginRunnerTestSuite) TestTimerRunner_WithInitialDelay() {
cc.WaitCancel()
s.Equal(2, len(ch))
}

func (s *pluginRunnerTestSuite) TestTimerRunner_WithPanicRecover() {
runner := &timerRunner{
state: s,
initialMaxDelay: time.Second,
interval: time.Millisecond * 1000,
rerunIfPanic: true, context: s.Context,
}
cc := pipeline.NewAsyncControl()
count := int64(0)

cc.Run(func(cc *pipeline.AsyncControl) {
runner.Run(func(state interface{}) error {
atomic.AddInt64(&count, 1)
panic("test panic")
}, cc)
})

time.Sleep(time.Second * 5)
cc.WaitCancel()
s.GreaterOrEqual(atomic.LoadInt64(&count), int64(5))
}
1 change: 1 addition & 0 deletions pluginmanager/plugin_runner_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func (p *pluginv1Runner) runMetricInput(async *pipeline.AsyncControl) {
m := metric
runner := &timerRunner{
initialMaxDelay: time.Duration(p.LogstoreConfig.GlobalConfig.InputMaxFirstCollectDelayMs) * time.Millisecond,
rerunIfPanic: true,
state: m.Input,
interval: m.Interval,
context: m.Config.Context,
Expand Down
7 changes: 6 additions & 1 deletion pluginmanager/plugin_runner_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (p *pluginv2Runner) addMetricInput(pluginMeta *pipeline.PluginMeta, input p
p.MetricPlugins = append(p.MetricPlugins, &wrapper)
p.TimerRunner = append(p.TimerRunner, &timerRunner{
initialMaxDelay: time.Duration(p.LogstoreConfig.GlobalConfig.InputMaxFirstCollectDelayMs) * time.Millisecond,
rerunIfPanic: true,
interval: wrapper.Interval,
state: input,
context: p.LogstoreConfig.Context,
Expand Down Expand Up @@ -193,6 +194,7 @@ func (p *pluginv2Runner) addAggregator(pluginMeta *pipeline.PluginMeta, aggregat
p.AggregatorPlugins = append(p.AggregatorPlugins, &wrapper)
p.TimerRunner = append(p.TimerRunner, &timerRunner{
state: aggregator,
rerunIfPanic: true,
initialMaxDelay: wrapper.Interval,
interval: wrapper.Interval,
context: p.LogstoreConfig.Context,
Expand Down Expand Up @@ -241,7 +243,10 @@ func (p *pluginv2Runner) runMetricInput(control *pipeline.AsyncControl) {
}, cc)
})
} else {
logger.Error(p.LogstoreConfig.Context.GetRuntimeContext(), "METRIC_INPUT_V2_START_FAILURE", "type assertion", "failure")
// Aggregator also wrapped by timerRunner
if _, ok := t.state.(pipeline.AggregatorV2); !ok {
logger.Error(p.LogstoreConfig.Context.GetRuntimeContext(), "METRIC_INPUT_V2_START_FAILURE", "type assertion", "failure")
}
}
}
}
Expand Down
Loading