From ee497e9322cbc737874fdfec1e29f911d99d59b7 Mon Sep 17 00:00:00 2001 From: zhushunjia Date: Thu, 11 Sep 2025 18:58:10 +0800 Subject: [PATCH] rerun if metric input and aggragator panic --- pkg/pipeline/control.go | 4 +++- pluginmanager/plugin_manager.go | 8 +++++++- pluginmanager/plugin_runner_helper.go | 20 ++++++++++++++++++-- pluginmanager/plugin_runner_test.go | 23 +++++++++++++++++++++++ pluginmanager/plugin_runner_v1.go | 1 + pluginmanager/plugin_runner_v2.go | 7 ++++++- 6 files changed, 58 insertions(+), 5 deletions(-) diff --git a/pkg/pipeline/control.go b/pkg/pipeline/control.go index f839d7d8ec..6f8839f542 100644 --- a/pkg/pipeline/control.go +++ b/pkg/pipeline/control.go @@ -14,7 +14,9 @@ package pipeline -import "sync" +import ( + "sync" +) // AsyncControl is an asynchronous execution control that can be canceled. type AsyncControl struct { diff --git a/pluginmanager/plugin_manager.go b/pluginmanager/plugin_manager.go index 3d5cd2cc4b..da8cbd64b1 100644 --- a/pluginmanager/plugin_manager.go +++ b/pluginmanager/plugin_manager.go @@ -88,11 +88,17 @@ var containerConfigJSON = `{ ] }` -func panicRecover(pluginType string) { +func panicRecover(pluginType string, callBacks ...func()) { if err := recover(); err != nil { trace := make([]byte, 2048) runtime.Stack(trace, true) logger.Error(context.Background(), "PLUGIN_RUNTIME_ALARM", "plugin", pluginType, "panicked", err, "stack", string(trace)) + + for _, cb := range callBacks { + if cb != nil { + cb() + } + } } } diff --git a/pluginmanager/plugin_runner_helper.go b/pluginmanager/plugin_runner_helper.go index 147bd638e1..a7bde91911 100644 --- a/pluginmanager/plugin_runner_helper.go +++ b/pluginmanager/plugin_runner_helper.go @@ -28,15 +28,31 @@ import ( type timerRunner struct { initialMaxDelay time.Duration interval time.Duration + rerunIfPanic bool context pipeline.Context state interface{} } func (p *timerRunner) Run(task func(state interface{}) error, cc *pipeline.AsyncControl) { logger.Info(p.context.GetRuntimeContext(), "task run", "start", "interval", p.interval, "max delay", p.initialMaxDelay, "state", fmt.Sprintf("%T", p.state)) - defer panicRecover(fmt.Sprint(p.state)) - exitFlag := false + + panicCallback := func() { + if exitFlag { + logger.Infof(p.context.GetRuntimeContext(), "task exited by async control") + return + } + + if p.rerunIfPanic { + logger.Warningf(p.context.GetRuntimeContext(), "TASK_PANIC_RECOVER", "task run panic, rerun after %v", p.initialMaxDelay) + cc.Run(func(cc *pipeline.AsyncControl) { + p.Run(task, cc) + }) + return + } + } + defer panicRecover(fmt.Sprint(p.state), panicCallback) + if p.initialMaxDelay > 0 { if p.initialMaxDelay > p.interval { logger.Infof(p.context.GetRuntimeContext(), "initial collect delay is larger than than interval, use interval %v instead", p.interval) diff --git a/pluginmanager/plugin_runner_test.go b/pluginmanager/plugin_runner_test.go index 9799758941..b0cbff7390 100644 --- a/pluginmanager/plugin_runner_test.go +++ b/pluginmanager/plugin_runner_test.go @@ -16,6 +16,7 @@ package pluginmanager import ( "context" + "sync/atomic" "testing" "time" @@ -99,3 +100,25 @@ func (s *pluginRunnerTestSuite) TestTimerRunner_WithInitialDelay() { cc.WaitCancel() s.Equal(2, len(ch)) } + +func (s *pluginRunnerTestSuite) TestTimerRunner_WithPanicRecover() { + runner := &timerRunner{ + state: s, + initialMaxDelay: time.Second, + interval: time.Millisecond * 1000, + rerunIfPanic: true, context: s.Context, + } + cc := pipeline.NewAsyncControl() + count := int64(0) + + cc.Run(func(cc *pipeline.AsyncControl) { + runner.Run(func(state interface{}) error { + atomic.AddInt64(&count, 1) + panic("test panic") + }, cc) + }) + + time.Sleep(time.Second * 5) + cc.WaitCancel() + s.GreaterOrEqual(atomic.LoadInt64(&count), int64(5)) +} diff --git a/pluginmanager/plugin_runner_v1.go b/pluginmanager/plugin_runner_v1.go index be0c9554d8..acd9bb6b0d 100644 --- a/pluginmanager/plugin_runner_v1.go +++ b/pluginmanager/plugin_runner_v1.go @@ -209,6 +209,7 @@ func (p *pluginv1Runner) runMetricInput(async *pipeline.AsyncControl) { m := metric runner := &timerRunner{ initialMaxDelay: time.Duration(p.LogstoreConfig.GlobalConfig.InputMaxFirstCollectDelayMs) * time.Millisecond, + rerunIfPanic: true, state: m.Input, interval: m.Interval, context: m.Config.Context, diff --git a/pluginmanager/plugin_runner_v2.go b/pluginmanager/plugin_runner_v2.go index d4fc2d6e28..946f12babe 100644 --- a/pluginmanager/plugin_runner_v2.go +++ b/pluginmanager/plugin_runner_v2.go @@ -157,6 +157,7 @@ func (p *pluginv2Runner) addMetricInput(pluginMeta *pipeline.PluginMeta, input p p.MetricPlugins = append(p.MetricPlugins, &wrapper) p.TimerRunner = append(p.TimerRunner, &timerRunner{ initialMaxDelay: time.Duration(p.LogstoreConfig.GlobalConfig.InputMaxFirstCollectDelayMs) * time.Millisecond, + rerunIfPanic: true, interval: wrapper.Interval, state: input, context: p.LogstoreConfig.Context, @@ -193,6 +194,7 @@ func (p *pluginv2Runner) addAggregator(pluginMeta *pipeline.PluginMeta, aggregat p.AggregatorPlugins = append(p.AggregatorPlugins, &wrapper) p.TimerRunner = append(p.TimerRunner, &timerRunner{ state: aggregator, + rerunIfPanic: true, initialMaxDelay: wrapper.Interval, interval: wrapper.Interval, context: p.LogstoreConfig.Context, @@ -241,7 +243,10 @@ func (p *pluginv2Runner) runMetricInput(control *pipeline.AsyncControl) { }, cc) }) } else { - logger.Error(p.LogstoreConfig.Context.GetRuntimeContext(), "METRIC_INPUT_V2_START_FAILURE", "type assertion", "failure") + // Aggregator also wrapped by timerRunner + if _, ok := t.state.(pipeline.AggregatorV2); !ok { + logger.Error(p.LogstoreConfig.Context.GetRuntimeContext(), "METRIC_INPUT_V2_START_FAILURE", "type assertion", "failure") + } } } }