diff --git a/contrib/rpc/grpcx/grpcx_gapp.go b/contrib/rpc/grpcx/grpcx_gapp.go new file mode 100644 index 00000000000..bf4463f2a08 --- /dev/null +++ b/contrib/rpc/grpcx/grpcx_gapp.go @@ -0,0 +1,66 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +// Package grpcx provides gapp Server adapter for GrpcServer. +package grpcx + +import ( + "time" + + "github.com/gogf/gf/v2/errors/gcode" + "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/os/gapp" +) + +// adapterStartTimeout is the maximum duration to wait for a gRPC server +// to indicate it is listening after Start() is called. +const grpcAdapterStartTimeout = time.Second * 2 + +// adapterStartPollInterval is the interval between readiness polls. +const grpcAdapterStartPollInterval = time.Millisecond * 10 + +// GrpcServerAdapter wraps GrpcServer to implement the gapp.Server interface. +type GrpcServerAdapter struct { + server *GrpcServer +} + +// NewGappServerAdapter creates and returns a gapp.Server adapter for GrpcServer. +func NewGappServerAdapter(server *GrpcServer) gapp.Server { + return &GrpcServerAdapter{server: server} +} + +// Start starts the gRPC server in non-blocking way without registering signal handlers. +func (a *GrpcServerAdapter) Start() error { + if err := a.server.StartManaged(); err != nil { + return gerror.WrapCode(gcode.CodeInternalError, err, "grpc server start failed") + } + + // Poll until the listener is established or timeout. + deadline := time.Now().Add(grpcAdapterStartTimeout) + for time.Now().Before(deadline) { + if a.server.GetListenedPort() > 0 { + return nil + } + time.Sleep(grpcAdapterStartPollInterval) + } + // serve() succeeded but we cannot confirm readiness; clean up the running server. + a.server.StopForceful() + return gerror.NewCode(gcode.CodeOperationFailed, "grpc server failed to start within timeout") +} + +// Stop stops the gRPC server. +// When graceful is true, it waits for in-flight RPCs to complete. +// When graceful is false, it forcibly stops the server immediately. +// Note: GrpcServer.Stop/StopForceful do not return errors, so this +// method always returns nil. +func (a *GrpcServerAdapter) Stop(graceful bool) error { + if graceful { + a.server.Stop() + } else { + a.server.StopForceful() + } + return nil +} diff --git a/contrib/rpc/grpcx/grpcx_gapp_z_unit_test.go b/contrib/rpc/grpcx/grpcx_gapp_z_unit_test.go new file mode 100644 index 00000000000..dd743049053 --- /dev/null +++ b/contrib/rpc/grpcx/grpcx_gapp_z_unit_test.go @@ -0,0 +1,82 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +// Unit tests for gapp.Server adapter wrapping GrpcServer. +package grpcx_test + +import ( + "testing" + + "github.com/gogf/gf/v2/os/gapp" + "github.com/gogf/gf/v2/test/gtest" + "github.com/gogf/gf/v2/util/guid" + + "github.com/gogf/gf/contrib/rpc/grpcx/v2" + "github.com/gogf/gf/contrib/rpc/grpcx/v2/testdata/controller" +) + +func Test_Grpcx_GappServerAdapter_ImplementsGappServer(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + c := grpcx.Server.NewConfig() + c.Name = guid.S() + s := grpcx.Server.New(c) + var _ gapp.Server = grpcx.NewGappServerAdapter(s) + }) +} + +func Test_Grpcx_StartManagedDoesNotRegisterSignalHandler(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + c := grpcx.Server.NewConfig() + c.Name = guid.S() + s := grpcx.Server.New(c) + controller.Register(s) + + err := s.StartManaged() + t.AssertNil(err) + t.Assert(s.GetListenedPort() > 0, true) + + err = s.StartManaged() + t.AssertNE(err, nil) + + s.StopForceful() + }) +} + +func Test_Grpcx_GappServerAdapter_StartGracefulStop(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + c := grpcx.Server.NewConfig() + c.Name = guid.S() + s := grpcx.Server.New(c) + controller.Register(s) + + ad := grpcx.NewGappServerAdapter(s) + + err := ad.Start() + t.AssertNil(err) + t.Assert(s.GetListenedPort() > 0, true) + + err = ad.Stop(true) + t.AssertNil(err) + }) +} + +func Test_Grpcx_GappServerAdapter_StopForceful(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + c := grpcx.Server.NewConfig() + c.Name = guid.S() + s := grpcx.Server.New(c) + controller.Register(s) + + ad := grpcx.NewGappServerAdapter(s) + + err := ad.Start() + t.AssertNil(err) + t.Assert(s.GetListenedPort() > 0, true) + + err = ad.Stop(false) + t.AssertNil(err) + }) +} diff --git a/contrib/rpc/grpcx/grpcx_grpc_server.go b/contrib/rpc/grpcx/grpcx_grpc_server.go index c022e6ea95a..372c64a38b0 100644 --- a/contrib/rpc/grpcx/grpcx_grpc_server.go +++ b/contrib/rpc/grpcx/grpcx_grpc_server.go @@ -30,13 +30,14 @@ import ( // GrpcServer is the server for GRPC protocol. type GrpcServer struct { - Server *grpc.Server - config *GrpcServerConfig - listener net.Listener - services []gsvc.Service - waitGroup sync.WaitGroup - registrar gsvc.Registrar - serviceMu sync.Mutex + Server *grpc.Server + config *GrpcServerConfig + listenerMu sync.RWMutex + listener net.Listener + services []gsvc.Service + waitGroup sync.WaitGroup + registrar gsvc.Registrar + serviceMu sync.Mutex } // Service implements gsvc.Service interface. @@ -95,35 +96,52 @@ func (s *GrpcServer) Service(services ...gsvc.Service) { s.services = append(s.services, services...) } -// Run starts the server in blocking way. -func (s *GrpcServer) Run() { - var ( - err error - ctx = gctx.GetInitCtx() - ) - // Create listener to bind listening ip and port. - s.listener, err = net.Listen("tcp", s.config.Address) +// serve binds the listener, starts serving asynchronously, and registers services. +// It does not block on OS signal handling and is intended for external lifecycle managers. +func (s *GrpcServer) serve() error { + ctx := gctx.GetInitCtx() + + s.listenerMu.Lock() + if s.listener != nil { + s.listenerMu.Unlock() + return gerror.NewCode(gcode.CodeInvalidOperation, "grpc server already started") + } + listener, err := net.Listen("tcp", s.config.Address) if err != nil { - s.Logger().Fatalf(ctx, `%+v`, err) + s.listenerMu.Unlock() + return err } + s.listener = listener + s.listenerMu.Unlock() - // Start listening. - go s.doServeAsynchronously(ctx) - - // Service register. + go s.doServeAsynchronously(ctx, listener) s.doServiceRegister() s.Logger().Infof( ctx, "pid[%d]: grpc server started listening on [%s]", gproc.Pid(), s.GetListenedAddress(), ) - s.doSignalListen() + return nil } -func (s *GrpcServer) doServeAsynchronously(ctx context.Context) { - if err := s.Server.Serve(s.listener); err != nil { +// Run starts the server in blocking way. +func (s *GrpcServer) Run() { + ctx := gctx.GetInitCtx() + if err := s.serve(); err != nil { s.Logger().Fatalf(ctx, `%+v`, err) } + s.doSignalListen() +} + +// StartManaged starts serving under external lifecycle management without signal handling. +func (s *GrpcServer) StartManaged() error { + return s.serve() +} + +func (s *GrpcServer) doServeAsynchronously(ctx context.Context, listener net.Listener) { + if err := s.Server.Serve(listener); err != nil && err != grpc.ErrServerStopped { + s.Logger().Errorf(ctx, `grpc server serve error: %+v`, err) + } } // doSignalListen does signal listening and handling for gracefully shutdown. @@ -225,6 +243,12 @@ func (s *GrpcServer) Stop() { s.Server.GracefulStop() } +// StopForceful forcibly stops the server and de-registers services from the registry. +func (s *GrpcServer) StopForceful() { + s.doServiceDeregister() + s.Server.Stop() +} + // GetConfig returns the configuration of current Server. func (s *GrpcServer) GetConfig() *GrpcServerConfig { return s.config @@ -245,6 +269,8 @@ func (s *GrpcServer) GetListenedAddress() string { // GetListenedPort retrieves and returns one port which is listened to by current server. func (s *GrpcServer) GetListenedPort() int { + s.listenerMu.RLock() + defer s.listenerMu.RUnlock() if ln := s.listener; ln != nil { return ln.Addr().(*net.TCPAddr).Port } diff --git a/contrib/scheduler/gjob/README.md b/contrib/scheduler/gjob/README.md new file mode 100644 index 00000000000..6a388e313a1 --- /dev/null +++ b/contrib/scheduler/gjob/README.md @@ -0,0 +1,230 @@ +# gjob + +Background job servers for GoFrame application lifecycle management. + +## Introduction + +The `gjob` package provides two `gapp.Server` implementations for background job processing: + +- **WorkerServer**: manages long-running background worker tasks that run in their own goroutines +- **CronServer**: manages scheduled cron tasks that run on specified intervals + +Both server types implement the `gapp.Server` interface and can be registered with `gapp.App` for unified lifecycle management. + +## Configuration-Based Setup + +The `NewServersFromConfig` helper reads job configurations from the application config file and creates the appropriate server instances. This is the recommended way to set up job servers when you want to decouple task definitions from handler code. + +### Configuration Format + +```yaml +scheduler: + job: + - name: my-worker # Unique task name + type: worker # Task type: "worker" or "cron" + enable: true # Whether to enable this task + - name: my-cron # Unique task name + type: cron # Task type: "worker" or "cron" + enable: true # Whether to enable this task + spec: "*/2 * * * * *" # Cron expression (cron type only) +``` + +### Usage + +```go +package main + +import ( + "context" + + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/net/ghttp" + gjob "github.com/gogf/gf/contrib/scheduler/gjob/v2" + "github.com/gogf/gf/v2/os/gapp" + "github.com/gogf/gf/v2/os/gctx" +) + +func main() { + httpServer := g.Server() + httpServer.BindHandler("/", func(r *ghttp.Request) { + r.Response.Write("ok") + }) + + handlers := gjob.HandlerMap{ + "my-worker": gjob.WorkerHandler(func(ctx context.Context) func() { + go doBackgroundWork(ctx) + return func() { + g.Log().Info(ctx, "worker cleaned up") + } + }), + "my-cron": gjob.CronHandler(func(ctx context.Context) error { + return syncData(ctx) + }), + } + + jobServers := gjob.NewServersFromConfig(context.Background(), handlers) + app := g.App(gapp.NewHTTPServerAdapter(httpServer)) + app.Add(jobServers...) + app.Run(gctx.GetInitCtx()) +} +``` + +Tasks that are disabled (`enable: false`) or have no matching handler in the `HandlerMap` are automatically skipped. + +## WorkerServer + +WorkerServer manages background tasks that run in their own goroutines. Each task receives a context that is cancelled when the server stops, and can return an optional cleanup function. + +### WorkerHandler + +```go +type WorkerHandler func(ctx context.Context) func() +``` + +The handler receives a context and returns an optional cleanup function. The cleanup is called after the context is cancelled during server shutdown. + +### Usage + +```go +package main + +import ( + "context" + + "github.com/gogf/gf/v2/frame/g" + gjob "github.com/gogf/gf/contrib/scheduler/gjob/v2" + "github.com/gogf/gf/v2/os/gapp" + "github.com/gogf/gf/v2/os/gctx" +) + +func main() { + workerSrv := gjob.NewWorkerServer(gctx.GetInitCtx(), + gjob.WorkerTask{ + Name: "order-worker", + Handler: func(ctx context.Context) func() { + // Start background work. + go processOrders(ctx) + return func() { + // Cleanup resources on shutdown. + g.Log().Info(ctx, "order worker cleaned up") + } + }, + }, + ) + + app := g.App(workerSrv) + app.Run(gctx.GetInitCtx()) +} +``` + +## CronServer + +CronServer manages tasks that run on a cron schedule using `gcron`. Tasks are registered as singleton jobs, meaning concurrent invocations are skipped if the previous one is still running. + +### CronHandler + +```go +type CronHandler func(ctx context.Context) error +``` + +### Usage + +```go +package main + +import ( + "context" + + gjob "github.com/gogf/gf/contrib/scheduler/gjob/v2" + "github.com/gogf/gf/v2/os/gapp" + "github.com/gogf/gf/v2/os/gctx" +) + +func main() { + cronSrv := gjob.NewCronServer(gctx.GetInitCtx(), + gjob.CronTask{ + Name: "data-sync", + Spec: "0 */5 * * * *", // Every 5 minutes + Handler: func(ctx context.Context) error { + return syncData(ctx) + }, + }, + ) + + app := g.App(cronSrv) + app.Run(gctx.GetInitCtx()) +} +``` + +## Combined Usage + +Both server types can be used together in a single application: + +```go +package main + +import ( + "context" + + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/net/ghttp" + gjob "github.com/gogf/gf/contrib/scheduler/gjob/v2" + "github.com/gogf/gf/v2/os/gapp" + "github.com/gogf/gf/v2/os/gctx" +) + +func main() { + httpServer := g.Server() + httpServer.BindHandler("/", func(r *ghttp.Request) { + r.Response.Write("ok") + }) + + workerSrv := gjob.NewWorkerServer(gctx.GetInitCtx(), + gjob.WorkerTask{ + Name: "event-worker", + Handler: eventWorkerHandler, + }, + ) + + cronSrv := gjob.NewCronServer(gctx.GetInitCtx(), + gjob.CronTask{ + Name: "cleanup", + Spec: "0 0 2 * * *", // Daily at 2am + Handler: cleanupHandler, + }, + ) + + app := g.App( + gapp.NewHTTPServerAdapter(httpServer), + workerSrv, + cronSrv, + ) + app.Run(gctx.GetInitCtx()) +} +``` + +## API Reference + +### WorkerServer + +| Method | Description | +|---|---| +| `NewWorkerServer(ctx context.Context, tasks ...WorkerTask)` | Creates a new WorkerServer with lifecycle context | +| `Add(tasks ...WorkerTask) error` | Adds tasks before Start; returns error if server already started or stopped | +| `Start() error` | Starts all tasks concurrently | +| `Stop(graceful bool) error` | Stops all tasks; cancels context and waits for goroutines to finish | + +### CronServer + +| Method | Description | +|---|---| +| `NewCronServer(ctx context.Context, tasks ...CronTask)` | Creates a new CronServer with lifecycle context | +| `Add(tasks ...CronTask) error` | Adds tasks before Start; returns error if server already started or stopped | +| `Start() error` | Registers and starts all cron tasks | +| `Stop(graceful bool) error` | Stops the cron scheduler | + +### Config Helper + +| Method | Description | +|---|---| +| `NewServersFromConfig(ctx, HandlerMap)` | Creates servers from `scheduler.job` config | diff --git a/contrib/scheduler/gjob/README.zh_CN.md b/contrib/scheduler/gjob/README.zh_CN.md new file mode 100644 index 00000000000..747ab114c6d --- /dev/null +++ b/contrib/scheduler/gjob/README.zh_CN.md @@ -0,0 +1,230 @@ +# gjob + +GoFrame 应用生命周期管理中的后台任务服务器。 + +## 介绍 + +`gjob` 包提供了两种 `gapp.Server` 实现,用于后台任务处理: + +- **WorkerServer**:管理在独立协程中运行的长驻后台任务 +- **CronServer**:管理按 cron 表达式定时执行的调度任务 + +两种服务器类型均实现了 `gapp.Server` 接口,可以注册到 `gapp.App` 进行统一的生命周期管理。 + +## 基于配置的启动方式 + +`NewServersFromConfig` 辅助函数从应用配置文件中读取任务配置并创建相应的服务器实例。当你希望将任务定义与处理代码解耦时,推荐使用此方式。 + +### 配置格式 + +```yaml +scheduler: + job: + - name: my-worker # 任务唯一名称 + type: worker # 任务类型:"worker" 或 "cron" + enable: true # 是否启用 + - name: my-cron # 任务唯一名称 + type: cron # 任务类型:"worker" 或 "cron" + enable: true # 是否启用 + spec: "*/2 * * * * *" # cron 表达式(仅 cron 类型需要) +``` + +### 使用示例 + +```go +package main + +import ( + "context" + + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/net/ghttp" + gjob "github.com/gogf/gf/contrib/scheduler/gjob/v2" + "github.com/gogf/gf/v2/os/gapp" + "github.com/gogf/gf/v2/os/gctx" +) + +func main() { + httpServer := g.Server() + httpServer.BindHandler("/", func(r *ghttp.Request) { + r.Response.Write("ok") + }) + + handlers := gjob.HandlerMap{ + "my-worker": gjob.WorkerHandler(func(ctx context.Context) func() { + go doBackgroundWork(ctx) + return func() { + g.Log().Info(ctx, "worker cleaned up") + } + }), + "my-cron": gjob.CronHandler(func(ctx context.Context) error { + return syncData(ctx) + }), + } + + jobServers := gjob.NewServersFromConfig(context.Background(), handlers) + app := g.App(gapp.NewHTTPServerAdapter(httpServer)) + app.Add(jobServers...) + app.Run(gctx.GetInitCtx()) +} +``` + +被禁用(`enable: false`)的任务或 `HandlerMap` 中没有对应处理函数的任务会自动跳过。 + +## WorkerServer + +WorkerServer 管理在独立协程中运行的后台任务。每个任务接收一个上下文,该上下文在服务器停止时会被取消,任务还可以返回一个可选的清理函数。 + +### WorkerHandler + +```go +type WorkerHandler func(ctx context.Context) func() +``` + +处理器接收一个上下文,并返回一个可选的清理函数。清理函数在服务器关闭、上下文取消后调用。 + +### 使用示例 + +```go +package main + +import ( + "context" + + "github.com/gogf/gf/v2/frame/g" + gjob "github.com/gogf/gf/contrib/scheduler/gjob/v2" + "github.com/gogf/gf/v2/os/gapp" + "github.com/gogf/gf/v2/os/gctx" +) + +func main() { + workerSrv := gjob.NewWorkerServer(gctx.GetInitCtx(), + gjob.WorkerTask{ + Name: "order-worker", + Handler: func(ctx context.Context) func() { + // 开始后台任务。 + go processOrders(ctx) + return func() { + // 关闭时清理资源。 + g.Log().Info(ctx, "order worker cleaned up") + } + }, + }, + ) + + app := g.App(workerSrv) + app.Run(gctx.GetInitCtx()) +} +``` + +## CronServer + +CronServer 管理通过 `gcron` 按 cron 表达式定时执行的任务。任务以单例模式注册,即如果上一次执行尚未完成,则跳过本次触发。 + +### CronHandler + +```go +type CronHandler func(ctx context.Context) error +``` + +### 使用示例 + +```go +package main + +import ( + "context" + + gjob "github.com/gogf/gf/contrib/scheduler/gjob/v2" + "github.com/gogf/gf/v2/os/gapp" + "github.com/gogf/gf/v2/os/gctx" +) + +func main() { + cronSrv := gjob.NewCronServer(gctx.GetInitCtx(), + gjob.CronTask{ + Name: "data-sync", + Spec: "0 */5 * * * *", // 每5分钟执行一次 + Handler: func(ctx context.Context) error { + return syncData(ctx) + }, + }, + ) + + app := g.App(cronSrv) + app.Run(gctx.GetInitCtx()) +} +``` + +## 组合使用 + +两种服务器类型可以在同一应用中组合使用: + +```go +package main + +import ( + "context" + + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/net/ghttp" + gjob "github.com/gogf/gf/contrib/scheduler/gjob/v2" + "github.com/gogf/gf/v2/os/gapp" + "github.com/gogf/gf/v2/os/gctx" +) + +func main() { + httpServer := g.Server() + httpServer.BindHandler("/", func(r *ghttp.Request) { + r.Response.Write("ok") + }) + + workerSrv := gjob.NewWorkerServer(gctx.GetInitCtx(), + gjob.WorkerTask{ + Name: "event-worker", + Handler: eventWorkerHandler, + }, + ) + + cronSrv := gjob.NewCronServer(gctx.GetInitCtx(), + gjob.CronTask{ + Name: "cleanup", + Spec: "0 0 2 * * *", // 每天凌晨2点 + Handler: cleanupHandler, + }, + ) + + app := g.App( + gapp.NewHTTPServerAdapter(httpServer), + workerSrv, + cronSrv, + ) + app.Run(gctx.GetInitCtx()) +} +``` + +## API 参考 + +### WorkerServer + +| 方法 | 说明 | +|---|---| +| `NewWorkerServer(ctx context.Context, tasks ...WorkerTask)` | 使用生命周期 context 创建 WorkerServer | +| `Add(tasks ...WorkerTask) error` | 在 Start 前添加任务;服务器已启动或已停止时返回错误 | +| `Start() error` | 并发启动所有任务 | +| `Stop(graceful bool) error` | 停止所有任务;取消 context 并等待 goroutine 退出 | + +### CronServer + +| 方法 | 说明 | +|---|---| +| `NewCronServer(ctx context.Context, tasks ...CronTask)` | 使用生命周期 context 创建 CronServer | +| `Add(tasks ...CronTask) error` | 在 Start 前添加任务;服务器已启动或已停止时返回错误 | +| `Start() error` | 注册并启动所有 cron 任务 | +| `Stop(graceful bool) error` | 停止 cron 调度器 | + +### 配置辅助 + +| 方法 | 说明 | +|---|---| +| `NewServersFromConfig(ctx, HandlerMap)` | 从 `scheduler.job` 配置创建服务器 | diff --git a/contrib/scheduler/gjob/gjob.go b/contrib/scheduler/gjob/gjob.go new file mode 100644 index 00000000000..e944a5be424 --- /dev/null +++ b/contrib/scheduler/gjob/gjob.go @@ -0,0 +1,10 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +// Package gjob provides gapp.Server implementations for background job processing, +// including WorkerServer for long-running background tasks and CronServer for +// scheduled cron tasks. +package gjob diff --git a/contrib/scheduler/gjob/gjob_config.go b/contrib/scheduler/gjob/gjob_config.go new file mode 100644 index 00000000000..f98a698ef00 --- /dev/null +++ b/contrib/scheduler/gjob/gjob_config.go @@ -0,0 +1,171 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +// Configuration-based helper for creating job servers. + +package gjob + +import ( + "context" + "fmt" + + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/gapp" + "github.com/gogf/gf/v2/util/gconv" +) + +// configKey is the configuration key for job tasks. +const configKey = "scheduler.job" + +// taskTypeWorker identifies worker tasks in configuration. +const taskTypeWorker = "worker" + +// taskTypeCron identifies cron tasks in configuration. +const taskTypeCron = "cron" + +// jobConfig holds the configuration for a single job task. +type jobConfig struct { + Name string `json:"name"` + Type string `json:"type"` + Enable bool `json:"enable"` + Spec string `json:"spec"` +} + +// HandlerMap maps task names to their handler functions. +// The value type depends on the task type: +// - worker: WorkerHandler (func(ctx context.Context) func()) +// - cron: CronHandler (func(ctx context.Context) error) +type HandlerMap map[string]any + +// NewServersFromConfig creates gapp.Server instances based on configuration. +// It reads job configurations from the "scheduler.job" config key and matches +// them with handlers from the provided HandlerMap. +// +// Configuration format (YAML example): +// +// scheduler: +// job: +// - name: my-worker +// type: worker +// enable: true +// - name: my-cron +// type: cron +// enable: true +// spec: "*/2 * * * * *" +// +// The returned slice contains only the servers that have at least one task. +// Disabled tasks and tasks without matching handlers are skipped. +func NewServersFromConfig(ctx context.Context, handlers HandlerMap) []gapp.Server { + var ( + configs = loadConfig(ctx) + workerTasks []WorkerTask + cronTasks []CronTask + ) + + for _, cfg := range configs { + if !cfg.Enable { + continue + } + + handler, ok := handlers[cfg.Name] + if !ok { + g.Log().Warningf(ctx, "job config %s: handler not found, skipping", cfg.Name) + continue + } + + switch cfg.Type { + case taskTypeWorker: + h, ok := handler.(WorkerHandler) + if !ok { + g.Log().Warningf(ctx, "job config %s: handler is not WorkerHandler, skipping", cfg.Name) + continue + } + workerTasks = append(workerTasks, WorkerTask{ + Name: cfg.Name, + Handler: h, + }) + + case taskTypeCron: + if cfg.Spec == "" { + g.Log().Warningf(ctx, "job config %s: missing spec for cron task, skipping", cfg.Name) + continue + } + h, ok := handler.(CronHandler) + if !ok { + g.Log().Warningf(ctx, "job config %s: handler is not CronHandler, skipping", cfg.Name) + continue + } + cronTasks = append(cronTasks, CronTask{ + Name: cfg.Name, + Spec: cfg.Spec, + Handler: h, + }) + + default: + g.Log().Warningf(ctx, "job config %s: unknown type %q, skipping", cfg.Name, cfg.Type) + } + } + + var servers []gapp.Server + + if len(workerTasks) > 0 { + servers = append(servers, NewWorkerServer(ctx, workerTasks...)) + } + + if len(cronTasks) > 0 { + servers = append(servers, NewCronServer(ctx, cronTasks...)) + } + + return servers +} + +// loadConfig reads job configurations from the application config file. +func loadConfig(ctx context.Context) []*jobConfig { + cfg, err := g.Cfg().Get(ctx, configKey) + if err != nil { + g.Log().Errorf(ctx, "failed to get job config: %v", err) + return nil + } + + if cfg.IsNil() || cfg.IsEmpty() { + return nil + } + + maps := cfg.Maps() + if len(maps) == 0 { + return nil + } + + var configs []*jobConfig + for _, m := range maps { + jobType := gconv.String(m["type"]) + if jobType == "" { + name := gconv.String(m["name"]) + g.Log().Warningf(ctx, "job config %s: missing type, skipping", name) + continue + } + + if jobType != taskTypeWorker && jobType != taskTypeCron { + name := gconv.String(m["name"]) + g.Log().Warningf(ctx, "job config %s: invalid type %q, skipping", name, jobType) + continue + } + + configs = append(configs, &jobConfig{ + Name: gconv.String(m["name"]), + Type: jobType, + Enable: gconv.Bool(m["enable"]), + Spec: gconv.String(m["spec"]), + }) + } + + return configs +} + +// String returns a human-readable description of the HandlerMap. +func (h HandlerMap) String() string { + return fmt.Sprintf("HandlerMap(%d handlers)", len(h)) +} diff --git a/contrib/scheduler/gjob/gjob_context.go b/contrib/scheduler/gjob/gjob_context.go new file mode 100644 index 00000000000..957d7e29a79 --- /dev/null +++ b/contrib/scheduler/gjob/gjob_context.go @@ -0,0 +1,23 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +// Lifecycle context helpers for job servers. + +package gjob + +import ( + "context" + + "github.com/gogf/gf/v2/os/gctx" +) + +// normalizeCtx maps nil to gctx.GetInitCtx so job servers inherit framework defaults. +func normalizeCtx(ctx context.Context) context.Context { + if ctx != nil { + return ctx + } + return gctx.GetInitCtx() +} diff --git a/contrib/scheduler/gjob/gjob_cron.go b/contrib/scheduler/gjob/gjob_cron.go new file mode 100644 index 00000000000..d116961991a --- /dev/null +++ b/contrib/scheduler/gjob/gjob_cron.go @@ -0,0 +1,147 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +// CronServer implementation for scheduled cron tasks. + +package gjob + +import ( + "context" + "sync" + + "github.com/gogf/gf/v2/errors/gcode" + "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/gapp" + "github.com/gogf/gf/v2/os/gcron" +) + +// CronHandler is the handler function for a cron task. +type CronHandler func(ctx context.Context) error + +// CronTask defines a cron task that runs on a specified schedule. +type CronTask struct { + // Name is the unique name of the cron task. + Name string + + // Spec is the cron expression that defines the schedule. + // It supports second-level precision, e.g. "*/2 * * * * *". + Spec string + + // Handler is the function that is called on each schedule tick. + Handler CronHandler +} + +// CronServer manages a set of cron tasks that implement the gapp.Server interface. +// Tasks are registered with the internal gcron.Cron scheduler as singleton jobs +// and run on their specified schedules. +// +// CronServer follows a single lifecycle: after Stop, Start returns CodeInvalidOperation. +// Create a new CronServer for another lifecycle round. +// When Stop is called, the scheduler is stopped and no more tasks are executed. +type CronServer struct { + mu sync.Mutex + parentCtx context.Context + ctx context.Context + cancel context.CancelFunc + cron *gcron.Cron + tasks []CronTask + started bool + stopped bool +} + +// NewCronServer creates and returns a new CronServer with the given tasks. +// The ctx parameter defines the parent lifecycle context for cron jobs; a nil +// value is normalized to gctx.GetInitCtx(). +// The context.WithCancel is deferred until Start is called to avoid leaking +// resources if the server is created but never started. +func NewCronServer(ctx context.Context, tasks ...CronTask) *CronServer { + return &CronServer{ + parentCtx: normalizeCtx(ctx), + tasks: tasks, + } +} + +// Add adds one or more cron tasks to the server. +// Tasks must be added before Start is called. +// Returns CodeInvalidOperation when the server has already started or stopped. +func (s *CronServer) Add(tasks ...CronTask) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.started { + return gerror.NewCode(gcode.CodeInvalidOperation, "cannot add cron tasks after server started") + } + if s.stopped { + return gerror.NewCode(gcode.CodeInvalidOperation, "cannot add cron tasks after server stopped") + } + s.tasks = append(s.tasks, tasks...) + return nil +} + +// Start starts the cron scheduler and registers all tasks as singleton jobs. +// Registration uses a fresh cron instance so a partial failure does not leave +// the server in an unrecoverable state; callers may retry Start after fixing tasks. +func (s *CronServer) Start() error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.started { + return gerror.NewCode(gcode.CodeInvalidOperation, "cron server already started") + } + if s.stopped { + return gerror.NewCode(gcode.CodeInvalidOperation, "cron server already stopped") + } + + s.ctx, s.cancel = context.WithCancel(s.parentCtx) + + cron := gcron.New() + for i := range s.tasks { + task := s.tasks[i] + handler := func(ctx context.Context) { + if err := task.Handler(ctx); err != nil { + g.Log().Errorf(ctx, "cron task %s handle error: %v", task.Name, err) + } + } + + _, err := cron.AddSingleton(s.ctx, task.Spec, handler, task.Name) + if err != nil { + cron.Close() + s.cancel() + return err + } + } + + cron.Start() + s.cron = cron + s.started = true + return nil +} + +// Stop stops the cron scheduler. +// When graceful is true, it waits for in-flight jobs to finish before returning. +// When graceful is false, running jobs are stopped immediately. +func (s *CronServer) Stop(graceful bool) error { + s.mu.Lock() + if !s.started { + s.mu.Unlock() + return nil + } + s.started = false + s.stopped = true + s.mu.Unlock() + + if graceful { + s.cron.StopGracefully() + } else { + s.cron.Stop() + } + s.cron.Close() + s.cancel() + return nil +} + +// Compile-time check that CronServer implements gapp.Server. +var _ gapp.Server = (*CronServer)(nil) diff --git a/contrib/scheduler/gjob/gjob_worker.go b/contrib/scheduler/gjob/gjob_worker.go new file mode 100644 index 00000000000..2f032d13440 --- /dev/null +++ b/contrib/scheduler/gjob/gjob_worker.go @@ -0,0 +1,144 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +// WorkerServer implementation for long-running background tasks. + +package gjob + +import ( + "context" + "sync" + + "github.com/gogf/gf/v2/errors/gcode" + "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/gapp" +) + +// Compile-time check that WorkerServer implements gapp.Server. +var _ gapp.Server = (*WorkerServer)(nil) + +// WorkerHandler is the handler function for a background worker task. +// It receives a context that is cancelled when the server stops, +// and returns an optional cleanup function that is called on shutdown. +type WorkerHandler func(ctx context.Context) func() + +// WorkerTask defines a background worker task that runs in its own goroutine. +// When the server stops, the task's context is cancelled and the +// handler's cleanup function (if any) is called. +type WorkerTask struct { + // Name is the unique name of the worker task. + Name string + + // Handler is the function that implements the worker logic. + Handler WorkerHandler +} + +// WorkerServer manages a set of background worker tasks that implement the gapp.Server interface. +// Each task runs in its own goroutine and is terminated when the server stops. +// +// WorkerServer follows a single lifecycle: after Stop, Start returns CodeInvalidOperation. +// Create a new WorkerServer for another lifecycle round. +// When Stop is called, the internal context is cancelled to signal all tasks +// to stop. The server always waits for task goroutines to finish before +// returning so lifecycle managers (including gapp rollback) do not proceed early. +type WorkerServer struct { + mu sync.Mutex + parentCtx context.Context + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + tasks []WorkerTask + started bool + stopped bool +} + +// NewWorkerServer creates and returns a new WorkerServer with the given tasks. +// The ctx parameter defines the parent lifecycle context for worker tasks; a nil +// value is normalized to gctx.GetInitCtx(). +// The context.WithCancel is deferred until Start is called to avoid leaking +// resources if the server is created but never started. +func NewWorkerServer(ctx context.Context, tasks ...WorkerTask) *WorkerServer { + return &WorkerServer{ + parentCtx: normalizeCtx(ctx), + tasks: tasks, + } +} + +// Add adds one or more worker tasks to the server. +// Tasks must be added before Start is called. +// Returns CodeInvalidOperation when the server has already started or stopped. +func (s *WorkerServer) Add(tasks ...WorkerTask) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.started { + return gerror.NewCode(gcode.CodeInvalidOperation, "cannot add worker tasks after server started") + } + if s.stopped { + return gerror.NewCode(gcode.CodeInvalidOperation, "cannot add worker tasks after server stopped") + } + s.tasks = append(s.tasks, tasks...) + return nil +} + +// Start starts all registered worker tasks in non-blocking way. +// Each task is launched in its own goroutine with panic recovery. +func (s *WorkerServer) Start() error { + s.mu.Lock() + defer s.mu.Unlock() + + if s.started { + return gerror.NewCode(gcode.CodeInvalidOperation, "worker server already started") + } + if s.stopped { + return gerror.NewCode(gcode.CodeInvalidOperation, "worker server already stopped") + } + + s.ctx, s.cancel = context.WithCancel(s.parentCtx) + + for i := range s.tasks { + task := s.tasks[i] + s.wg.Add(1) + go s.runTask(task) + } + + s.started = true + return nil +} + +// Stop stops the worker server by cancelling the internal context and waiting +// for all task goroutines to finish. The graceful parameter is accepted for +// gapp.Server compatibility; worker shutdown always waits after cancellation. +func (s *WorkerServer) Stop(_ bool) error { + s.mu.Lock() + if !s.started { + s.mu.Unlock() + return nil + } + s.started = false + s.stopped = true + s.mu.Unlock() + + s.cancel() + s.wg.Wait() + + return nil +} + +// runTask runs a single worker task in a goroutine with panic recovery. +func (s *WorkerServer) runTask(task WorkerTask) { + defer s.wg.Done() + + if err := g.Try(s.ctx, func(ctx context.Context) { + cleanup := task.Handler(ctx) + if cleanup != nil { + defer cleanup() + } + <-ctx.Done() + }); err != nil { + g.Log().Errorf(s.ctx, "worker task %s exited with error: %v", task.Name, err) + } +} diff --git a/contrib/scheduler/gjob/gjob_z_unit_test.go b/contrib/scheduler/gjob/gjob_z_unit_test.go new file mode 100644 index 00000000000..396760eb1e5 --- /dev/null +++ b/contrib/scheduler/gjob/gjob_z_unit_test.go @@ -0,0 +1,739 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package gjob_test + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/gapp" + "github.com/gogf/gf/v2/os/gcfg" + "github.com/gogf/gf/v2/os/gctx" + "github.com/gogf/gf/v2/test/gtest" + + gjob "github.com/gogf/gf/contrib/scheduler/gjob/v2" +) + +func TestWorkerServerImplementsGappServer(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var _ gapp.Server = gjob.NewWorkerServer(nil) + }) +} + +func TestNewWorkerServer(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s := gjob.NewWorkerServer(nil) + t.AssertNE(s, nil) + }) +} + +func TestNewWorkerServerWithTasks(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s := gjob.NewWorkerServer(nil, + gjob.WorkerTask{Name: "task1", Handler: func(ctx context.Context) func() { return nil }}, + ) + t.AssertNE(s, nil) + }) +} + +func TestWorkerServerAdd(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s := gjob.NewWorkerServer(nil) + err := s.Add( + gjob.WorkerTask{Name: "task1", Handler: func(ctx context.Context) func() { return nil }}, + gjob.WorkerTask{Name: "task2", Handler: func(ctx context.Context) func() { return nil }}, + ) + t.AssertNil(err) + }) +} + +func TestWorkerServerAddAfterStart(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s := gjob.NewWorkerServer(nil, + gjob.WorkerTask{Name: "task1", Handler: func(ctx context.Context) func() { return nil }}, + ) + err := s.Start() + t.AssertNil(err) + + err = s.Add(gjob.WorkerTask{Name: "task2", Handler: func(ctx context.Context) func() { return nil }}) + t.AssertNE(err, nil) + + err = s.Stop(true) + t.AssertNil(err) + }) +} + +func TestWorkerServerStartStop(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var ( + started int32 + cleaned int32 + ) + + s := gjob.NewWorkerServer(nil, + gjob.WorkerTask{ + Name: "test-worker", + Handler: func(ctx context.Context) func() { + atomic.StoreInt32(&started, 1) + return func() { + atomic.StoreInt32(&cleaned, 1) + } + }, + }, + ) + + err := s.Start() + t.AssertNil(err) + + // Wait for the handler to be called. + time.Sleep(100 * time.Millisecond) + t.Assert(atomic.LoadInt32(&started), 1) + + err = s.Stop(true) + t.AssertNil(err) + t.Assert(atomic.LoadInt32(&cleaned), 1) + }) +} + +func TestWorkerServerStopForceful(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var ( + started int32 + exited int32 + ) + + s := gjob.NewWorkerServer(nil, + gjob.WorkerTask{ + Name: "test-worker", + Handler: func(ctx context.Context) func() { + atomic.StoreInt32(&started, 1) + <-ctx.Done() + atomic.StoreInt32(&exited, 1) + return nil + }, + }, + ) + + err := s.Start() + t.AssertNil(err) + + time.Sleep(100 * time.Millisecond) + t.Assert(atomic.LoadInt32(&started), 1) + + err = s.Stop(false) + t.AssertNil(err) + t.Assert(atomic.LoadInt32(&exited), 1) + }) +} + +func TestWorkerServerStartTwice(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s := gjob.NewWorkerServer(nil) + err := s.Start() + t.AssertNil(err) + err = s.Start() + t.AssertNE(err, nil) + }) +} + +func TestWorkerServerMultipleTasks(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var ( + mu sync.Mutex + started []string + cleaned []string + ) + + makeHandler := func(name string) gjob.WorkerHandler { + return func(ctx context.Context) func() { + mu.Lock() + started = append(started, name) + mu.Unlock() + return func() { + mu.Lock() + cleaned = append(cleaned, name) + mu.Unlock() + } + } + } + + s := gjob.NewWorkerServer(nil, + gjob.WorkerTask{Name: "task1", Handler: makeHandler("task1")}, + gjob.WorkerTask{Name: "task2", Handler: makeHandler("task2")}, + ) + + err := s.Start() + t.AssertNil(err) + + time.Sleep(100 * time.Millisecond) + + err = s.Stop(true) + t.AssertNil(err) + + mu.Lock() + t.Assert(len(started), 2) + t.Assert(len(cleaned), 2) + mu.Unlock() + }) +} + +func TestWorkerServerWithContextCancellation(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var ( + handlerCalled int32 + cleaned int32 + ) + + s := gjob.NewWorkerServer(nil, + gjob.WorkerTask{ + Name: "test-ctx", + Handler: func(ctx context.Context) func() { + atomic.StoreInt32(&handlerCalled, 1) + return func() { + atomic.StoreInt32(&cleaned, 1) + } + }, + }, + ) + + err := s.Start() + t.AssertNil(err) + + time.Sleep(100 * time.Millisecond) + t.Assert(atomic.LoadInt32(&handlerCalled), 1) + + err = s.Stop(true) + t.AssertNil(err) + t.Assert(atomic.LoadInt32(&cleaned), 1) + }) +} + +func TestWorkerServerPropagatesLifecycleContext(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + traceCtx := gctx.WithSpan(context.Background(), "worker-lifecycle") + traceID := gctx.CtxId(traceCtx) + + var ( + mu sync.Mutex + gotID string + ) + done := make(chan struct{}) + s := gjob.NewWorkerServer(traceCtx, gjob.WorkerTask{ + Name: "ctx-worker", + Handler: func(ctx context.Context) func() { + mu.Lock() + gotID = gctx.CtxId(ctx) + mu.Unlock() + close(done) + return nil + }, + }) + + err := s.Start() + t.AssertNil(err) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("worker handler was not invoked") + } + + mu.Lock() + t.Assert(gotID, traceID) + mu.Unlock() + + err = s.Stop(true) + t.AssertNil(err) + }) +} + +func TestCronServerPropagatesLifecycleContext(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + traceCtx := gctx.WithSpan(context.Background(), "cron-lifecycle") + traceID := gctx.CtxId(traceCtx) + + var ( + mu sync.Mutex + gotID string + ) + done := make(chan struct{}) + s := gjob.NewCronServer(traceCtx, gjob.CronTask{ + Name: "ctx-cron", + Spec: "*/1 * * * * *", + Handler: func(ctx context.Context) error { + mu.Lock() + if gotID == "" { + gotID = gctx.CtxId(ctx) + close(done) + } + mu.Unlock() + return nil + }, + }) + + err := s.Start() + t.AssertNil(err) + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("cron handler was not invoked") + } + + mu.Lock() + t.Assert(gotID, traceID) + mu.Unlock() + + err = s.Stop(true) + t.AssertNil(err) + }) +} + +func TestCronServerImplementsGappServer(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var _ gapp.Server = gjob.NewCronServer(nil) + }) +} + +func TestWorkerServerStopBeforeStart(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s := gjob.NewWorkerServer(nil) + err := s.Stop(true) + t.AssertNil(err) + }) +} + +func TestCronServerStopGracefulWaitsInFlightJob(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var ( + running int32 + done int32 + ) + + s := gjob.NewCronServer(nil, + gjob.CronTask{ + Name: "slow-cron", + Spec: "*/1 * * * * *", + Handler: func(ctx context.Context) error { + atomic.StoreInt32(&running, 1) + time.Sleep(300 * time.Millisecond) + atomic.StoreInt32(&done, 1) + return nil + }, + }, + ) + + err := s.Start() + t.AssertNil(err) + + for i := 0; i < 50 && atomic.LoadInt32(&running) == 0; i++ { + time.Sleep(20 * time.Millisecond) + } + t.Assert(atomic.LoadInt32(&running), int32(1)) + t.Assert(atomic.LoadInt32(&done), int32(0)) + + err = s.Stop(true) + t.AssertNil(err) + t.Assert(atomic.LoadInt32(&done), int32(1)) + }) +} + +func TestNewCronServer(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s := gjob.NewCronServer(nil) + t.AssertNE(s, nil) + }) +} + +func TestCronServerAdd(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s := gjob.NewCronServer(nil) + err := s.Add( + gjob.CronTask{Name: "task1", Spec: "*/1 * * * * *", Handler: func(ctx context.Context) error { return nil }}, + ) + t.AssertNil(err) + }) +} + +func TestCronServerAddAfterStart(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s := gjob.NewCronServer(nil, + gjob.CronTask{Name: "task1", Spec: "*/1 * * * * *", Handler: func(ctx context.Context) error { return nil }}, + ) + err := s.Start() + t.AssertNil(err) + + err = s.Add(gjob.CronTask{Name: "task2", Spec: "*/1 * * * * *", Handler: func(ctx context.Context) error { return nil }}) + t.AssertNE(err, nil) + + err = s.Stop(true) + t.AssertNil(err) + }) +} + +func TestCronServerStartStop(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var called int32 + + s := gjob.NewCronServer(nil, + gjob.CronTask{ + Name: "test-cron", + Spec: "*/1 * * * * *", + Handler: func(ctx context.Context) error { + atomic.AddInt32(&called, 1) + return nil + }, + }, + ) + + err := s.Start() + t.AssertNil(err) + + // Wait for at least one tick. + time.Sleep(1500 * time.Millisecond) + t.Assert(atomic.LoadInt32(&called) >= 1, true) + + err = s.Stop(true) + t.AssertNil(err) + }) +} + +func TestCronServerStartTwice(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s := gjob.NewCronServer(nil, + gjob.CronTask{Name: "t", Spec: "*/1 * * * * *", Handler: func(ctx context.Context) error { return nil }}, + ) + + err := s.Start() + t.AssertNil(err) + err = s.Start() + t.AssertNE(err, nil) + + err = s.Stop(true) + t.AssertNil(err) + }) +} + +func TestCronServerStopForceful(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var called int32 + + s := gjob.NewCronServer(nil, + gjob.CronTask{ + Name: "test-cron-force", + Spec: "*/1 * * * * *", + Handler: func(ctx context.Context) error { + atomic.AddInt32(&called, 1) + return nil + }, + }, + ) + + err := s.Start() + t.AssertNil(err) + + time.Sleep(1500 * time.Millisecond) + + err = s.Stop(false) + t.AssertNil(err) + }) +} + +func TestCronServerInvalidSpec(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s := gjob.NewCronServer(nil, + gjob.CronTask{ + Name: "invalid", + Spec: "invalid-spec", + Handler: func(ctx context.Context) error { return nil }, + }, + ) + + err := s.Start() + t.AssertNE(err, nil) + }) +} + +func TestCronServerStartPartialFailureRetry(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s := gjob.NewCronServer(nil, + gjob.CronTask{ + Name: "good", + Spec: "*/1 * * * * *", + Handler: func(ctx context.Context) error { return nil }, + }, + gjob.CronTask{ + Name: "bad", + Spec: "invalid-spec", + Handler: func(ctx context.Context) error { return nil }, + }, + ) + + err := s.Start() + t.AssertNE(err, nil) + + // A second Start must fail with the same validation error, not duplicate-name. + err = s.Start() + t.AssertNE(err, nil) + t.AssertIN("invalid pattern", err.Error()) + + // After fixing tasks, Start on a fresh server succeeds. + fixed := gjob.NewCronServer(nil, + gjob.CronTask{ + Name: "good", + Spec: "*/1 * * * * *", + Handler: func(ctx context.Context) error { return nil }, + }, + ) + err = fixed.Start() + t.AssertNil(err) + err = fixed.Stop(true) + t.AssertNil(err) + }) +} + +func TestCronServerHandlerError(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var called int32 + + s := gjob.NewCronServer(nil, + gjob.CronTask{ + Name: "error-cron", + Spec: "*/1 * * * * *", + Handler: func(ctx context.Context) error { + atomic.AddInt32(&called, 1) + return context.DeadlineExceeded + }, + }, + ) + + err := s.Start() + t.AssertNil(err) + + // Wait for at least one tick, handler should not crash the server. + time.Sleep(1500 * time.Millisecond) + t.Assert(atomic.LoadInt32(&called) >= 1, true) + + err = s.Stop(true) + t.AssertNil(err) + }) +} + +func TestWorkerAndCronWithGapp(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var ( + workerStarted int32 + cronCalled int32 + ) + + workerSrv := gjob.NewWorkerServer(nil, + gjob.WorkerTask{ + Name: "app-worker", + Handler: func(ctx context.Context) func() { + atomic.StoreInt32(&workerStarted, 1) + return nil + }, + }, + ) + + cronSrv := gjob.NewCronServer(nil, + gjob.CronTask{ + Name: "app-cron", + Spec: "*/1 * * * * *", + Handler: func(ctx context.Context) error { + atomic.AddInt32(&cronCalled, 1) + return nil + }, + }, + ) + + app := gapp.New(workerSrv, cronSrv) + + err := app.Start(context.Background()) + t.AssertNil(err) + + time.Sleep(1500 * time.Millisecond) + + t.Assert(atomic.LoadInt32(&workerStarted), 1) + t.Assert(atomic.LoadInt32(&cronCalled) >= 1, true) + + err = app.Stop(context.Background(), true) + t.AssertNil(err) + }) +} + +const testJobConfig = ` +scheduler: + job: + - name: worker1 + type: worker + enable: true + - name: cron1 + type: cron + enable: true + spec: "*/1 * * * * *" + - name: disabled-worker + type: worker + enable: false + - name: no-type + enable: true + - name: invalid-type + type: unknown + enable: true +` + +func TestNewServersFromConfig(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + // Set up test configuration. + adapter, ok := g.Cfg().GetAdapter().(*gcfg.AdapterFile) + if !ok { + t.Fatal("expected gcfg.AdapterFile") + } + adapter.SetContent(testJobConfig) + defer adapter.SetContent("") + + var ( + workerStarted int32 + cronCalled int32 + ) + + handlers := gjob.HandlerMap{ + "worker1": gjob.WorkerHandler(func(ctx context.Context) func() { + atomic.StoreInt32(&workerStarted, 1) + return nil + }), + "cron1": gjob.CronHandler(func(ctx context.Context) error { + atomic.AddInt32(&cronCalled, 1) + return nil + }), + } + + servers := gjob.NewServersFromConfig(context.Background(), handlers) + // Should create 2 servers: one WorkerServer and one CronServer. + t.Assert(len(servers), 2) + + app := gapp.New(servers...) + err := app.Start(context.Background()) + t.AssertNil(err) + + time.Sleep(1500 * time.Millisecond) + + t.Assert(atomic.LoadInt32(&workerStarted), 1) + t.Assert(atomic.LoadInt32(&cronCalled) >= 1, true) + + err = app.Stop(context.Background(), true) + t.AssertNil(err) + }) +} + +func TestNewServersFromConfigNoHandlers(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + adapter, ok := g.Cfg().GetAdapter().(*gcfg.AdapterFile) + if !ok { + t.Fatal("expected gcfg.AdapterFile") + } + adapter.SetContent(testJobConfig) + defer adapter.SetContent("") + + // No handlers provided. + servers := gjob.NewServersFromConfig(context.Background(), gjob.HandlerMap{}) + t.Assert(len(servers), 0) + }) +} + +func TestNewServersFromConfigWrongHandlerType(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + adapter, ok := g.Cfg().GetAdapter().(*gcfg.AdapterFile) + if !ok { + t.Fatal("expected gcfg.AdapterFile") + } + adapter.SetContent(testJobConfig) + defer adapter.SetContent("") + + // Provide wrong handler types. + handlers := gjob.HandlerMap{ + "worker1": func() {}, // Not a WorkerHandler + "cron1": func() {}, // Not a CronHandler + } + + servers := gjob.NewServersFromConfig(context.Background(), handlers) + // All handlers have wrong types, so no servers should be created. + t.Assert(len(servers), 0) + }) +} + +func TestNewServersFromConfigEmptyConfig(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + adapter, ok := g.Cfg().GetAdapter().(*gcfg.AdapterFile) + if !ok { + t.Fatal("expected gcfg.AdapterFile") + } + adapter.SetContent("") + defer adapter.SetContent("") + + servers := gjob.NewServersFromConfig(context.Background(), gjob.HandlerMap{ + "worker1": gjob.WorkerHandler(func(ctx context.Context) func() { return nil }), + }) + t.Assert(len(servers), 0) + }) +} + +func TestNewServersFromConfigCronMissingSpec(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + const cronMissingSpecConfig = ` +scheduler: + job: + - name: cron1 + type: cron + enable: true +` + adapter, ok := g.Cfg().GetAdapter().(*gcfg.AdapterFile) + if !ok { + t.Fatal("expected gcfg.AdapterFile") + } + adapter.SetContent(cronMissingSpecConfig) + defer adapter.SetContent("") + + handlers := gjob.HandlerMap{ + "cron1": gjob.CronHandler(func(ctx context.Context) error { return nil }), + } + + servers := gjob.NewServersFromConfig(context.Background(), handlers) + t.Assert(len(servers), 0) + }) +} + +func TestNewServersFromConfigOnlyCron(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + const cronOnlyConfig = ` +scheduler: + job: + - name: cron1 + type: cron + enable: true + spec: "*/1 * * * * *" +` + adapter, ok := g.Cfg().GetAdapter().(*gcfg.AdapterFile) + if !ok { + t.Fatal("expected gcfg.AdapterFile") + } + adapter.SetContent(cronOnlyConfig) + defer adapter.SetContent("") + + handlers := gjob.HandlerMap{ + "cron1": gjob.CronHandler(func(ctx context.Context) error { return nil }), + } + + servers := gjob.NewServersFromConfig(context.Background(), handlers) + // Only one CronServer should be created. + t.Assert(len(servers), 1) + }) +} diff --git a/contrib/scheduler/gjob/go.mod b/contrib/scheduler/gjob/go.mod new file mode 100644 index 00000000000..7b7e86a0c61 --- /dev/null +++ b/contrib/scheduler/gjob/go.mod @@ -0,0 +1,37 @@ +module github.com/gogf/gf/contrib/scheduler/gjob/v2 + +go 1.23.0 + +require github.com/gogf/gf/v2 v2.10.2 + +require ( + github.com/BurntSushi/toml v1.5.0 // indirect + github.com/clbanning/mxj/v2 v2.7.0 // indirect + github.com/emirpasic/gods/v2 v2.0.0-alpha // indirect + github.com/fatih/color v1.18.0 // indirect + github.com/fsnotify/fsnotify v1.9.0 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/gorilla/websocket v1.5.3 // indirect + github.com/grokify/html-strip-tags-go v0.1.0 // indirect + github.com/magiconair/properties v1.8.10 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-runewidth v0.0.16 // indirect + github.com/olekukonko/errors v1.1.0 // indirect + github.com/olekukonko/ll v0.0.9 // indirect + github.com/olekukonko/tablewriter v1.1.0 // indirect + github.com/rivo/uniseg v0.2.0 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel v1.38.0 // indirect + go.opentelemetry.io/otel/metric v1.38.0 // indirect + go.opentelemetry.io/otel/sdk v1.38.0 // indirect + go.opentelemetry.io/otel/trace v1.38.0 // indirect + golang.org/x/net v0.40.0 // indirect + golang.org/x/sys v0.35.0 // indirect + golang.org/x/text v0.25.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace github.com/gogf/gf/v2 => ../../../ diff --git a/contrib/scheduler/gjob/go.sum b/contrib/scheduler/gjob/go.sum new file mode 100644 index 00000000000..0718fa9fee4 --- /dev/null +++ b/contrib/scheduler/gjob/go.sum @@ -0,0 +1,79 @@ +github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg= +github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= +github.com/clbanning/mxj/v2 v2.7.0 h1:WA/La7UGCanFe5NpHF0Q3DNtnCsVoxbPKuyBNHWRyME= +github.com/clbanning/mxj/v2 v2.7.0/go.mod h1:hNiWqW14h+kc+MdF9C6/YoRfjEJoR3ou6tn/Qo+ve2s= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/emirpasic/gods/v2 v2.0.0-alpha h1:dwFlh8pBg1VMOXWGipNMRt8v96dKAIvBehtCt6OtunU= +github.com/emirpasic/gods/v2 v2.0.0-alpha/go.mod h1:W0y4M2dtBB9U5z3YlghmpuUhiaZT2h6yoeE+C1sCp6A= +github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= +github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/grokify/html-strip-tags-go v0.1.0 h1:03UrQLjAny8xci+R+qjCce/MYnpNXCtgzltlQbOBae4= +github.com/grokify/html-strip-tags-go v0.1.0/go.mod h1:ZdzgfHEzAfz9X6Xe5eBLVblWIxXfYSQ40S/VKrAOGpc= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/magiconair/properties v1.8.10 h1:s31yESBquKXCV9a/ScB3ESkOjUYYv+X0rg8SYxI99mE= +github.com/magiconair/properties v1.8.10/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= +github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/olekukonko/errors v1.1.0 h1:RNuGIh15QdDenh+hNvKrJkmxxjV4hcS50Db478Ou5sM= +github.com/olekukonko/errors v1.1.0/go.mod h1:ppzxA5jBKcO1vIpCXQ9ZqgDh8iwODz6OXIGKU8r5m4Y= +github.com/olekukonko/ll v0.0.9 h1:Y+1YqDfVkqMWuEQMclsF9HUR5+a82+dxJuL1HHSRpxI= +github.com/olekukonko/ll v0.0.9/go.mod h1:En+sEW0JNETl26+K8eZ6/W4UQ7CYSrrgg/EdIYT2H8g= +github.com/olekukonko/tablewriter v1.1.0 h1:N0LHrshF4T39KvI96fn6GT8HEjXRXYNDrDjKFDB7RIY= +github.com/olekukonko/tablewriter v1.1.0/go.mod h1:5c+EBPeSqvXnLLgkm9isDdzR3wjfBkHR9Nhfp3NWrzo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= +go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= +go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= +go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= +go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E= +go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg= +go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM= +go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA= +go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= +go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY= +golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= +golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/frame/g/g_object.go b/frame/g/g_object.go index d09e75f6520..6afab8fab32 100644 --- a/frame/g/g_object.go +++ b/frame/g/g_object.go @@ -15,6 +15,7 @@ import ( "github.com/gogf/gf/v2/net/ghttp" "github.com/gogf/gf/v2/net/gtcp" "github.com/gogf/gf/v2/net/gudp" + "github.com/gogf/gf/v2/os/gapp" "github.com/gogf/gf/v2/os/gcfg" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/gres" @@ -42,6 +43,13 @@ func UDPServer(name ...any) *gudp.Server { return gudp.GetServer(name...) } +// App creates and returns a new App instance with optional initial servers. +// The App manages the lifecycle of multiple servers with unified start/stop +// and signal handling for graceful shutdown. +func App(servers ...gapp.Server) *gapp.App { + return gapp.New(servers...) +} + // View returns an instance of template engine object with specified name. func View(name ...string) *gview.View { return gins.View(name...) diff --git a/net/ghttp/ghttp_server_admin.go b/net/ghttp/ghttp_server_admin.go index 4178a4ca85a..408a3bb1a56 100644 --- a/net/ghttp/ghttp_server_admin.go +++ b/net/ghttp/ghttp_server_admin.go @@ -73,10 +73,15 @@ func (p *utilAdmin) Restart(r *Request) { // Shutdown shuts down all the servers. func (p *utilAdmin) Shutdown(r *Request) { + // Capture stable references before entering the timer closure, + // since the request lifecycle ends after WriteExit returns. + server := r.Server gtimer.SetTimeout(r.Context(), time.Second, func(ctx context.Context) { // It shuts down the server after 1 second, which is not triggered by system signal, // to ensure the response successfully to the client. - _ = r.Server.Shutdown() + if err := server.Shutdown(); err != nil { + server.Logger().Errorf(ctx, "server shutdown failed: %v", err) + } }) r.Response.WriteExit("server shutdown") } @@ -91,10 +96,9 @@ func (s *Server) EnableAdmin(pattern ...string) { s.BindObject(p, &utilAdmin{}) } -// Shutdown shuts the current server. -func (s *Server) Shutdown() error { - var ctx = context.TODO() - // Remove plugins. +// stopServers removes plugins, deregisters the service discovery endpoint if any, +// then shuts down or force-closes all underlying HTTP servers. +func (s *Server) stopServers(ctx context.Context, forceful bool) { if len(s.plugins) > 0 { for _, p := range s.plugins { s.Logger().Printf(ctx, `remove plugin: %s`, p.Name()) @@ -105,11 +109,33 @@ func (s *Server) Shutdown() error { } s.doServiceDeregister() - // Only shut down current servers. - // It may have multiple underlying http servers. for _, v := range s.servers { - v.Shutdown(ctx) + if forceful { + v.Close(ctx) + } else { + v.Shutdown(ctx) + } } - s.Logger().Infof(ctx, "pid[%d]: all servers shutdown", gproc.Pid()) + + if forceful { + s.Logger().Infof(ctx, "pid[%d]: all servers closed", gproc.Pid()) + } else { + s.Logger().Infof(ctx, "pid[%d]: all servers shutdown", gproc.Pid()) + } +} + +// Shutdown gracefully shuts the current server. +// It waits for all in-flight requests to complete before shutting down. +func (s *Server) Shutdown() error { + var ctx = context.TODO() + s.stopServers(ctx, false) + return nil +} + +// Close forcibly closes the current server. +// Unlike Shutdown, it does not wait for in-flight requests to complete. +func (s *Server) Close() error { + var ctx = context.TODO() + s.stopServers(ctx, true) return nil } diff --git a/os/gapp/README.md b/os/gapp/README.md new file mode 100644 index 00000000000..c877fb1c1c1 --- /dev/null +++ b/os/gapp/README.md @@ -0,0 +1,155 @@ +# gapp + +Application-level lifecycle management for multiple servers. + +## Introduction + +The `gapp` package provides a unified `Server` interface and an `App` struct that coordinates the startup and shutdown of multiple servers, including signal handling for graceful shutdown. It also supports an `Option` mechanism for structured application initialization before servers start. + +## Context propagation + +- `Boot`, `Start`, `Stop`, and `Run` take a `context.Context` for tracing and cancellation. +- A nil argument is normalized to `gctx.GetInitCtx()`, matching framework defaults such as `ghttp.Server.Start`. +- After the first successful `Boot`, subsequent `Start`/`Stop` calls with a nil argument reuse the normalized context captured during `Boot`. +- During concurrent server startup in `Start`, if the resolved context completes first, partially started servers are rolled back similar to startup failure cleanup. +- `Run` shuts down gracefully when receiving an OS shutdown signal **or** when the root context completes; `Stop` receives `gctx.NeverDone(root)` so trace metadata survives while teardown itself is not shortened by parent cancellation. + +## Server Interface + +```go +type Server interface { + Start() error + Stop(graceful bool) error +} +``` + +- `Start()`: Starts the server in a non-blocking way. +- `Stop(graceful)`: Stops the server. When `graceful` is `true`, the server waits for in-flight requests to complete before shutting down. When `graceful` is `false`, the server is forcibly closed. + +## Adapters + +The package provides adapter constructors for built-in server types: + +| Constructor | Wraps | +|---|---| +| `NewHTTPServerAdapter(s *ghttp.Server)` | HTTP server | +| `NewTCPServerAdapter(s *gtcp.Server)` | TCP server | +| `NewUDPServerAdapter(s *gudp.Server)` | UDP server | + +For gRPC server, use `grpcx.NewGappServerAdapter(s *grpcx.GrpcServer)` from the `contrib/rpc/grpcx` package. + +## Boot / Initialization + +The `Option` type allows registering initialization logic that runs before servers start. This provides a structured way to set up databases, caches, external connections, and other application-level concerns. + +### Option Interface + +```go +type Option interface { + Apply(ctx context.Context, app *App) (func(ctx context.Context), error) +} +``` + +The `Apply` method runs during `Boot()`. It can return an optional cleanup function that will be called during `Stop()` in reverse registration order. + +### Option Constructors + +| Constructor | Description | +|---|---| +| `NewOption(f)` | One-shot initialization, no cleanup | +| `NewOptionWithHook(f)` | Initialization with optional cleanup function | + +### Usage + +```go +package main + +import ( + "context" + + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/net/ghttp" + "github.com/gogf/gf/v2/os/gapp" + "github.com/gogf/gf/v2/os/gctx" +) + +func main() { + httpServer := g.Server() + httpServer.BindHandler("/", func(r *ghttp.Request) { + r.Response.Write("hello") + }) + + app := g.App(gapp.NewHTTPServerAdapter(httpServer)) + + // One-shot initialization + app.Option(gapp.NewOption(func(ctx context.Context, a *gapp.App) { + g.Log().Info(ctx, "application initialized") + })) + + // Initialization with cleanup + app.Option(gapp.NewOptionWithHook(func(ctx context.Context, a *gapp.App) (func(ctx context.Context), error) { + conn := connectToExternalService() + return func(ctx context.Context) { + conn.Close() + }, nil + })) + + app.Run(gctx.GetInitCtx()) +} +``` + +### Lifecycle Order + +1. `Boot(ctx)` -- Applies all Options in registration order (`nil ctx` behaves like `GetInitCtx()`) +2. `Start(ctx)` -- Starts all servers concurrently (`nil ctx` after Boot reuses Boot context) +3. (running under `Run`) -- Blocks until shutdown signal or root ctx completion +4. `Stop(ctx, graceful)` -- Runs cleanup hooks in reverse order, then stops servers in reverse order + +If `Boot()` is not called explicitly, `Start()` calls it automatically using the resolved `Start` context. + +## Single lifecycle + +`App` is designed for a **single process lifecycle** (typical `main()` usage): + +- The first successful `Stop` runs cleanup hooks and stops all servers. Later `Stop` calls are no-ops. +- `Start` after `Stop` may succeed for individual servers, but `App.Stop` will not stop them again. Create a new `App` for another round. +- Job servers in `contrib/scheduler/gjob` (`WorkerServer`, `CronServer`) also reject `Start` after `Stop`. Add tasks only before `Start`; `Add` returns an error if called after the server has started or stopped. + +When using `gapp.Run()`, register servers through adapters and call `StartManaged()`-style APIs on contrib servers. Do **not** call legacy blocking `Run()` on the same servers (for example `grpcx.GrpcServer.Run()` or ghttp admin restart paths), or you may register competing OS signal handlers. + +## Usage + +```go +package main + +import ( + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/net/ghttp" + "github.com/gogf/gf/v2/os/gapp" + "github.com/gogf/gf/v2/os/gctx" +) + +func main() { + httpServer := g.Server() + httpServer.BindHandler("/", func(r *ghttp.Request) { + r.Response.Write("hello") + }) + + app := g.App(gapp.NewHTTPServerAdapter(httpServer)) + app.Run(gctx.GetInitCtx()) +} +``` + +## App Methods + +| Method | Description | +|---|---| +| `New(servers ...Server)` | Creates a new App with optional initial servers | +| `Add(servers ...Server)` | Adds servers to the App | +| `Option(opts ...Option)` | Registers Options to apply during Boot | +| `Boot(ctx context.Context) error` | Applies all Options; idempotent | +| `Booted() bool` | Returns whether Boot has been called successfully | +| `Start(ctx context.Context) error` | Starts servers concurrently (`nil ctx` reuses Boot context); honors ctx during startup waits | +| `Run(ctx context.Context) error` | Boots, starts, registers handlers, blocks until shutdown signal or ctx completes; returns Stop error on shutdown failure | +| `Stop(ctx context.Context, graceful bool) error` | Cleanup hooks then reverse server shutdown (`nil ctx` reuses Boot context) | +| `Servers() []Server` | Returns a copy of the registered servers | diff --git a/os/gapp/README.zh_CN.md b/os/gapp/README.zh_CN.md new file mode 100644 index 00000000000..d4910eeab05 --- /dev/null +++ b/os/gapp/README.zh_CN.md @@ -0,0 +1,155 @@ +# gapp + +多服务器应用级别的生命周期管理。 + +## 介绍 + +`gapp` 包提供了统一的 `Server` 接口和 `App` 结构体,用于协调多个服务器的启动和关闭,包括优雅关闭的信号处理。同时支持通过 `Option` 机制在服务器启动前进行结构化的应用初始化。 + +## Context 传递 + +- `Boot`、`Start`、`Stop`、`Run` 均接收 `context.Context`,用于链路追踪与取消传播。 +- 参数为 nil 时使用 `gctx.GetInitCtx()`,与框架内其他入口(如 `ghttp.Server.Start`)一致。 +- 首次 `Boot` 成功后,其后 `Start`/`Stop` 若传入 nil,会沿用该次 Boot 所使用的规范化 context。 +- `Start` 并发启动服务器时,若外层 context 先结束,已对成功启动的服务器按与原启动失败一致的策略回滚关闭。 +- `Run` 在收到 OS 关闭信号 **或** root context 结束时触发优雅关闭;实际的 `Stop` 使用 `gctx.NeverDone(root)`,在保留链路元数据的同时,避免 teardown 被子 context 的超时或取消提前打断。 + +## Server 接口 + +```go +type Server interface { + Start() error + Stop(graceful bool) error +} +``` + +- `Start()`:以非阻塞方式启动服务器。 +- `Stop(graceful)`:停止服务器。当 `graceful` 为 `true` 时,服务器等待正在处理的请求完成后再关闭。当 `graceful` 为 `false` 时,服务器被强制关闭。 + +## 适配器 + +该包为内置服务器类型提供了适配器构造函数: + +| 构造函数 | 包装类型 | +|---|---| +| `NewHTTPServerAdapter(s *ghttp.Server)` | HTTP 服务器 | +| `NewTCPServerAdapter(s *gtcp.Server)` | TCP 服务器 | +| `NewUDPServerAdapter(s *gudp.Server)` | UDP 服务器 | + +对于 gRPC 服务器,请使用 `contrib/rpc/grpcx` 包中的 `grpcx.NewGappServerAdapter(s *grpcx.GrpcServer)`。 + +## 启动 / 初始化 + +`Option` 类型允许在服务器启动前注册初始化逻辑,为数据库、缓存、外部连接等应用级别的初始化提供结构化的方式。 + +### Option 接口 + +```go +type Option interface { + Apply(ctx context.Context, app *App) (func(ctx context.Context), error) +} +``` + +`Apply` 方法在 `Boot()` 期间执行。它可以返回一个可选的清理函数,该函数将在 `Stop()` 期间按注册的逆序调用。 + +### Option 构造函数 + +| 构造函数 | 描述 | +|---|---| +| `NewOption(f)` | 一次性初始化,无清理函数 | +| `NewOptionWithHook(f)` | 初始化并返回可选的清理函数 | + +### 使用示例 + +```go +package main + +import ( + "context" + + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/net/ghttp" + "github.com/gogf/gf/v2/os/gapp" + "github.com/gogf/gf/v2/os/gctx" +) + +func main() { + httpServer := g.Server() + httpServer.BindHandler("/", func(r *ghttp.Request) { + r.Response.Write("hello") + }) + + app := g.App(gapp.NewHTTPServerAdapter(httpServer)) + + // 一次性初始化 + app.Option(gapp.NewOption(func(ctx context.Context, a *gapp.App) { + g.Log().Info(ctx, "应用初始化完成") + })) + + // 带清理函数的初始化 + app.Option(gapp.NewOptionWithHook(func(ctx context.Context, a *gapp.App) (func(ctx context.Context), error) { + conn := connectToExternalService() + return func(ctx context.Context) { + conn.Close() + }, nil + })) + + app.Run(gctx.GetInitCtx()) +} +``` + +### 生命周期顺序 + +1. `Boot(ctx)` -- 按注册顺序应用所有 Option(`nil` context 等价于 `GetInitCtx()`) +2. `Start(ctx)` -- 并发启动所有服务器(`Boot` 后若 `ctx` 为 nil,则沿用 Boot context) +3. `Run` 运行阶段阻塞直到关闭信号或 root context 结束 +4. `Stop(ctx, graceful)` -- 按逆序执行清理函数,然后按逆序停止服务器 + +未显式调用 `Boot()` 时,`Start()` 会使用本次 `Start` 解析得到的 context 自动执行 `Boot`。 + +## 单次生命周期 + +`App` 面向 **单次进程生命周期**(典型的 `main()` 用法): + +- 首次成功的 `Stop` 会执行清理 hook 并停止所有服务器,之后的 `Stop` 调用为 no-op。 +- `Stop` 之后再次 `Start` 可能对个别服务器成功,但 `App.Stop` 不会再次停止它们。需要新一轮生命周期时请创建新的 `App`。 +- `contrib/scheduler/gjob` 中的任务服务器(`WorkerServer`、`CronServer`)在 `Stop` 后也会拒绝再次 `Start`。任务只能在 `Start` 之前添加;若服务器已启动或已停止,`Add` 会返回错误。 + +使用 `gapp.Run()` 时,请通过 adapter 注册服务器,并在 contrib 服务器上使用 `StartManaged()` 等非阻塞入口。不要对同一进程中的同一服务器再调用 legacy 阻塞 `Run()`(例如 `grpcx.GrpcServer.Run()` 或 ghttp admin 重启路径),否则可能注册冲突的 OS 信号处理器。 + +## 使用示例 + +```go +package main + +import ( + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/net/ghttp" + "github.com/gogf/gf/v2/os/gapp" + "github.com/gogf/gf/v2/os/gctx" +) + +func main() { + httpServer := g.Server() + httpServer.BindHandler("/", func(r *ghttp.Request) { + r.Response.Write("hello") + }) + + app := g.App(gapp.NewHTTPServerAdapter(httpServer)) + app.Run(gctx.GetInitCtx()) +} +``` + +## App 方法 + +| 方法 | 描述 | +|---|---| +| `New(servers ...Server)` | 创建新的 App,可选传入初始服务器 | +| `Add(servers ...Server)` | 向 App 添加服务器 | +| `Option(opts ...Option)` | 注册在 Boot 期间应用的 Option | +| `Boot(ctx context.Context) error` | 应用所有 Option;幂等操作 | +| `Booted() bool` | 返回 Boot 是否已成功调用 | +| `Start(ctx context.Context) error` | 并发启动服务器(`nil ctx` 复用 Boot context);在等待启动完成时会响应 ctx | +| `Run(ctx context.Context) error` | Boot、Start、注册处理函数,阻塞直至信号或 ctx 结束;关闭失败时返回 Stop 错误 | +| `Stop(ctx context.Context, graceful bool) error` | 清理 hook 后以逆序停止服务器(`nil ctx` 复用 Boot context) | +| `Servers() []Server` | 返回已注册服务器的副本 | diff --git a/os/gapp/gapp.go b/os/gapp/gapp.go new file mode 100644 index 00000000000..ce68c40cf06 --- /dev/null +++ b/os/gapp/gapp.go @@ -0,0 +1,466 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +// Package gapp provides application-level lifecycle management for multiple servers. +// +// It defines a unified Server interface and an App struct that coordinates +// the startup and shutdown of all registered servers, including signal handling +// for graceful shutdown. +package gapp + +import ( + "context" + "os" + "sync" + + "github.com/gogf/gf/v2/errors/gcode" + "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/os/gctx" + "github.com/gogf/gf/v2/os/glog" + "github.com/gogf/gf/v2/os/gproc" +) + +// Server is the interface for server lifecycle management. +// It defines the minimal contract for servers that can be managed by App. +type Server interface { + // Start starts the server in non-blocking way. + Start() error + + // Stop stops the server. + // The parameter graceful indicates whether to stop gracefully. + // When graceful is true, the server waits for in-flight requests to complete + // before shutting down. When graceful is false, the server is forcibly closed. + Stop(graceful bool) error +} + +// App manages the lifecycle of multiple Server instances. +// It provides unified startup, shutdown, and signal handling +// for all registered servers. +type App struct { + mu sync.RWMutex + servers []Server + options []Option // registered options to apply during Boot + hooks []func(ctx context.Context) // cleanup functions collected during Boot + booted bool // whether Boot has been called successfully + booting bool // whether Boot is in progress + lastBootErr error // result of the last failed Boot for concurrent waiters + bootCond sync.Cond // coordinates concurrent Boot callers + lifecycleCtx context.Context // context from first successful Boot; used when Start/Stop get nil ctx + logger *glog.Logger + stopOnce sync.Once // ensures Stop is executed only once +} + +// New creates and returns a new App instance with optional initial servers. +func New(servers ...Server) *App { + app := &App{ + servers: make([]Server, 0), + options: make([]Option, 0), + hooks: make([]func(ctx context.Context), 0), + logger: glog.New(), + } + app.bootCond.L = &app.mu + if len(servers) > 0 { + app.servers = append(app.servers, servers...) + } + return app +} + +// shutdownModeGracefully identifies a graceful shutdown in log messages. +const shutdownModeGracefully = "gracefully" + +// shutdownModeForcefully identifies a forceful shutdown in log messages. +const shutdownModeForcefully = "forcefully" + +// normalizeCtx maps nil to gctx.GetInitCtx so callers use the framework default propagation context. +func normalizeCtx(ctx context.Context) context.Context { + if ctx != nil { + return ctx + } + return gctx.GetInitCtx() +} + +// resolveCtx returns explicit ctx when non-nil, otherwise lifecycleCtx after Boot or GetInitCtx. +func (app *App) resolveCtx(ctx context.Context) context.Context { + if ctx != nil { + return ctx + } + app.mu.RLock() + lc := app.lifecycleCtx + app.mu.RUnlock() + if lc != nil { + return lc + } + return gctx.GetInitCtx() +} + +// Add adds one or more Server instances to the App. +func (app *App) Add(servers ...Server) { + app.mu.Lock() + defer app.mu.Unlock() + app.servers = append(app.servers, servers...) +} + +// Option registers one or more Options to be applied during Boot. +// Options are applied in registration order. +func (app *App) Option(opts ...Option) { + app.mu.Lock() + defer app.mu.Unlock() + app.options = append(app.options, opts...) +} + +// Boot applies all registered Options in registration order. +// Each Option's Apply method is called, and any returned cleanup +// functions are collected for later execution during Stop(). +// +// Boot is idempotent: calling it multiple times is safe and subsequent +// calls after the first are no-ops. +// +// If an Option's Apply returns an error, Boot rolls back by running +// any already-collected cleanup hooks in reverse order and returns +// the error. +// +// If ctx is nil, gctx.GetInitCtx() is used for Apply and rollback. +func (app *App) Boot(ctx context.Context) error { + app.mu.Lock() + for { + if app.booted { + app.mu.Unlock() + return nil + } + if app.booting { + app.bootCond.Wait() + if app.booted { + app.mu.Unlock() + return nil + } + if !app.booting { + err := app.lastBootErr + app.mu.Unlock() + return err + } + continue + } + app.booting = true + app.lastBootErr = nil + break + } + baseCtx := normalizeCtx(ctx) + + // Release the lock before applying options. applyOptions reads from + // app.options by index so that Options registered dynamically during + // Apply() (via app.Option()) are also processed. + app.mu.Unlock() + + // Ensure booting is always reset and waiters are unblocked, + // even if applyOptions panics during rollback. + var err error + defer func() { + app.mu.Lock() + app.booting = false + if err == nil { + app.booted = true + if app.lifecycleCtx == nil { + app.lifecycleCtx = baseCtx + } + } else { + app.lastBootErr = gerror.WrapCode(gcode.CodeInternalError, err, "app boot failed") + err = app.lastBootErr + } + app.bootCond.Broadcast() + app.mu.Unlock() + }() + + err = app.applyOptions(baseCtx, 0) + + if err == nil { + app.logger.Infof(baseCtx, "app booted successfully") + } + return err +} + +// applyOptions runs registered Options and collects cleanup hooks. +// It reads from app.options by index so that Options registered +// dynamically during Apply() (via app.Option()) are also processed. +func (app *App) applyOptions(baseCtx context.Context, startIdx int) error { + for { + app.mu.RLock() + if startIdx >= len(app.options) { + app.mu.RUnlock() + break + } + opt := app.options[startIdx] + app.mu.RUnlock() + + hook, err := opt.Apply(baseCtx, app) + if err != nil { + // Rollback: run already-collected hooks in reverse. + app.runHooksReverse(baseCtx) + return err + } + if hook != nil { + app.mu.Lock() + app.hooks = append(app.hooks, hook) + app.mu.Unlock() + } + + startIdx++ + } + return nil +} + +// Booted returns whether Boot has been called successfully. +func (app *App) Booted() bool { + app.mu.RLock() + defer app.mu.RUnlock() + return app.booted +} + +// runHooksReverse atomically swaps out all collected cleanup hooks, then runs +// them in reverse order. The hooks slice is cleared before execution so that +// hooks appended concurrently (e.g. by applyOptions during Boot) are not +// accidentally discarded, and a panic in one hook does not cause double-cleanup +// on a subsequent call. +func (app *App) runHooksReverse(ctx context.Context) { + app.mu.Lock() + hooks := app.hooks + app.hooks = app.hooks[:0] + app.mu.Unlock() + + for i := len(hooks) - 1; i >= 0; i-- { + func() { + defer func() { + if r := recover(); r != nil { + app.logger.Errorf(ctx, "cleanup hook panicked: %v", r) + } + }() + hooks[i](ctx) + }() + } +} + +// rollbackStartedServers force-stops servers whose Start succeeded, in reverse registration order. +func (app *App) rollbackStartedServers(ctx context.Context, servers []Server, startOK []bool) { + for i := len(servers) - 1; i >= 0; i-- { + if !startOK[i] { + continue + } + if err := servers[i].Stop(false); err != nil { + app.logger.Errorf(ctx, "server rollback stop failed during start: %v", err) + } + } +} + +// Start starts all registered servers in non-blocking way. +// It starts all servers concurrently and returns the first error encountered, +// or nil if all servers started successfully. +// +// When one or more servers fail to start after others have succeeded, each +// server that did start successfully is force-stopped in reverse registration +// order before the error is returned. +// +// When ctx becomes done before all Server.Start calls complete, servers that +// already started successfully are force-stopped in reverse order and this +// method returns ctx.Err(). +// +// If Boot has not been called yet, it is called automatically first using the +// same resolved context as this call. If ctx is nil, lifecycleCtx from Boot +// or gctx.GetInitCtx() applies. +func (app *App) Start(ctx context.Context) error { + ctx = app.resolveCtx(ctx) + if err := app.Boot(ctx); err != nil { + return err + } + + app.mu.RLock() + servers := make([]Server, len(app.servers)) + copy(servers, app.servers) + app.mu.RUnlock() + + var ( + wg sync.WaitGroup + mu sync.Mutex + firstErr error + startOK = make([]bool, len(servers)) + ) + + for i := range servers { + wg.Add(1) + go func(idx int, s Server) { + defer wg.Done() + if err := s.Start(); err != nil { + mu.Lock() + if firstErr == nil { + firstErr = err + } + mu.Unlock() + app.logger.Errorf(ctx, "server start failed: %v", err) + return + } + mu.Lock() + startOK[idx] = true + mu.Unlock() + }(i, servers[i]) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + var cancelErr error + select { + case <-done: + case <-ctx.Done(): + wg.Wait() + cancelErr = ctx.Err() + } + + if cancelErr != nil { + app.rollbackStartedServers(ctx, servers, startOK) + return cancelErr + } + + if firstErr != nil { + app.rollbackStartedServers(ctx, servers, startOK) + return gerror.WrapCode(gcode.CodeInternalError, firstErr, "app start failed") + } + app.logger.Infof(ctx, "all servers started successfully") + return nil +} + +// Run starts all registered servers and blocks until a shutdown signal is received +// or ctx is canceled. It handles OS signals (SIGINT, SIGTERM, etc.) for graceful +// shutdown when a signal is received. When ctx completes, graceful shutdown runs +// the same Stop path via gctx.NeverDone so propagation metadata is kept without +// canceling cleanup I/O prematurely. +// +// The lifecycle order is: Boot (apply Options) -> Start (start servers) -> +// block on signal or ctx -> Stop (cleanup hooks + stop servers). +// +// Run returns an error if Boot or Start fails, allowing the caller to decide +// whether to exit. On successful startup, Run blocks until shutdown completes +// and returns any error from graceful Stop; nil means shutdown completed cleanly. +// +// App and its registered servers follow a single lifecycle: after the first +// successful Stop, subsequent Stop calls are no-ops. Create a new App instance +// if you need a fresh lifecycle (for example in tests). +// +// If ctx is nil, gctx.GetInitCtx() resolves the root context before Boot. +func (app *App) Run(ctx context.Context) error { + root := app.resolveCtx(ctx) + var ( + stopOnce sync.Once + exitCh = make(chan struct{}) + shutdownErr error + ) + doShutdown := func() { + stopOnce.Do(func() { + shutdownErr = app.Stop(gctx.NeverDone(root), true) + if shutdownErr != nil { + app.logger.Errorf(root, "graceful shutdown failed: %v", shutdownErr) + } + close(exitCh) + }) + } + + if err := app.Boot(root); err != nil { + return gerror.WrapCode(gcode.CodeInternalError, err, "app boot failed") + } + + if err := app.Start(root); err != nil { + if stopErr := app.Stop(root, false); stopErr != nil { + app.logger.Errorf(root, "forceful stop after start failure also failed: %v", stopErr) + } + return gerror.WrapCode(gcode.CodeInternalError, err, "app start failed") + } + + gproc.AddSigHandlerShutdown(func(sig os.Signal) { + app.logger.Infof(root, "received shutdown signal: %s, shutting down...", sig.String()) + doShutdown() + }) + + app.waitForRunExit(root, doShutdown, exitCh) + return shutdownErr +} + +// waitForRunExit blocks until shutdown completes from either context cancellation +// or an OS shutdown signal. Signal handlers are registered before this call; +// StartListen starts the background listener without blocking on waitChan so +// context-driven shutdown can return without leaving a goroutine stuck in Listen. +func (app *App) waitForRunExit(ctx context.Context, doShutdown func(), shutdownDone <-chan struct{}) { + go func() { + <-ctx.Done() + doShutdown() + }() + + gproc.StartListen() + + <-shutdownDone +} + +// Stop stops all registered servers. +// The parameter graceful indicates whether to stop gracefully. +// Cleanup hooks from Options are run in reverse order first, +// then servers are stopped in reverse registration order. +// +// Stop is idempotent: calling it multiple times is safe and only the first +// call executes the shutdown logic. After Stop completes, the App cannot +// shut down servers again; register servers and call Start on a new App for +// another lifecycle round. +// +// If ctx is nil, lifecycleCtx or gctx.GetInitCtx() is used for hooks and logs. +func (app *App) Stop(ctx context.Context, graceful bool) error { + var firstErr error + app.stopOnce.Do(func() { + opCtx := app.resolveCtx(ctx) + + // Run cleanup hooks in reverse order first. + app.runHooksReverse(opCtx) + + // Copy server list under the lock, then release before stopping + // to avoid holding the read lock during potentially long server shutdown. + app.mu.RLock() + servers := make([]Server, len(app.servers)) + copy(servers, app.servers) + app.mu.RUnlock() + + if len(servers) == 0 { + return + } + + // Stop in reverse order. + for i := len(servers) - 1; i >= 0; i-- { + server := servers[i] + if err := server.Stop(graceful); err != nil { + if firstErr == nil { + firstErr = err + } + app.logger.Errorf(opCtx, "server stop failed: %v", err) + } + } + + mode := shutdownModeGracefully + if !graceful { + mode = shutdownModeForcefully + } + app.logger.Infof(opCtx, "all servers stopped %s", mode) + }) + if firstErr != nil { + return gerror.WrapCode(gcode.CodeInternalError, firstErr, "app stop failed") + } + return nil +} + +// Servers returns a copy of the registered servers list. +func (app *App) Servers() []Server { + app.mu.RLock() + defer app.mu.RUnlock() + + servers := make([]Server, len(app.servers)) + copy(servers, app.servers) + return servers +} diff --git a/os/gapp/gapp_option.go b/os/gapp/gapp_option.go new file mode 100644 index 00000000000..df9e7234cab --- /dev/null +++ b/os/gapp/gapp_option.go @@ -0,0 +1,49 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +// Option definitions for app initialization. + +package gapp + +import "context" + +// Option is the interface for app initialization options. +// It is applied during App.Boot() to configure application-level +// concerns before servers start. +// +// Implementations can return an optional cleanup function from Apply, +// which will be called during App.Stop(ctx, graceful) in reverse registration order. +type Option interface { + // Apply initializes resources for the application. + // It returns an optional cleanup function that will be called + // during App.Stop(ctx, graceful), or nil if no cleanup is needed. + Apply(ctx context.Context, app *App) (func(ctx context.Context), error) +} + +// optionFunc is an adapter to allow the use of ordinary functions as Options. +type optionFunc func(ctx context.Context, app *App) (func(ctx context.Context), error) + +// Apply implements the Option interface. +func (f optionFunc) Apply(ctx context.Context, app *App) (func(ctx context.Context), error) { + return f(ctx, app) +} + +// NewOption creates an Option from a simple initialization function. +// The function runs once during Boot(). No cleanup is registered. +func NewOption(f func(ctx context.Context, app *App)) Option { + return optionFunc(func(ctx context.Context, app *App) (func(ctx context.Context), error) { + f(ctx, app) + return nil, nil + }) +} + +// NewOptionWithHook creates an Option from an initialization function +// that also returns an optional cleanup function. +// The init function runs during Boot(), and the returned cleanup +// function (if non-nil) runs during App.Stop(ctx, graceful) in reverse registration order. +func NewOptionWithHook(f func(ctx context.Context, app *App) (func(ctx context.Context), error)) Option { + return optionFunc(f) +} diff --git a/os/gapp/gapp_server.go b/os/gapp/gapp_server.go new file mode 100644 index 00000000000..ac4bacf882a --- /dev/null +++ b/os/gapp/gapp_server.go @@ -0,0 +1,171 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +// Adapter implementations for built-in server types. + +package gapp + +import ( + "time" + + "github.com/gogf/gf/v2/errors/gcode" + "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/net/ghttp" + "github.com/gogf/gf/v2/net/gtcp" + "github.com/gogf/gf/v2/net/gudp" +) + +// adapterStartTimeout is the maximum duration to wait for a server +// to indicate it is listening after Start() is called. +const adapterStartTimeout = time.Second * 2 + +// adapterStartPollInterval is the interval between readiness polls. +const adapterStartPollInterval = time.Millisecond * 10 + +// httpServerAdapter wraps ghttp.Server to implement the Server interface. +type httpServerAdapter struct { + server *ghttp.Server +} + +// NewHTTPServerAdapter creates and returns a Server adapter for ghttp.Server. +func NewHTTPServerAdapter(server *ghttp.Server) Server { + return &httpServerAdapter{server: server} +} + +// Start starts the HTTP server in non-blocking way. +func (a *httpServerAdapter) Start() error { + return a.server.Start() +} + +// Stop stops the HTTP server. +// When graceful is true, it waits for in-flight requests to complete. +// When graceful is false, it forcibly closes all connections. +func (a *httpServerAdapter) Stop(graceful bool) error { + if graceful { + return a.server.Shutdown() + } + return a.server.Close() +} + +// tcpServerAdapter wraps gtcp.Server to implement the Server interface. +type tcpServerAdapter struct { + server *gtcp.Server +} + +// NewTCPServerAdapter creates and returns a Server adapter for gtcp.Server. +func NewTCPServerAdapter(server *gtcp.Server) Server { + return &tcpServerAdapter{server: server} +} + +// Start starts the TCP server in non-blocking way. +// Since gtcp.Server.Run() is blocking, it is launched in a goroutine +// and the method polls until the listener is ready. +func (a *tcpServerAdapter) Start() error { + var ( + errCh = make(chan error, 1) + ) + go func() { + if err := a.server.Run(); err != nil { + select { + case errCh <- err: + default: + } + } + }() + + // Poll until the listener is established or timeout. + deadline := time.Now().Add(adapterStartTimeout) + for time.Now().Before(deadline) { + if a.server.GetListenedPort() > 0 { + return nil + } + select { + case err := <-errCh: + return gerror.WrapCode(gcode.CodeInternalError, err, "tcp server start failed") + default: + } + time.Sleep(adapterStartPollInterval) + } + + select { + case err := <-errCh: + return gerror.WrapCode(gcode.CodeInternalError, err, "tcp server start failed") + default: + } + // Best-effort cleanup of the leaked goroutine and port on timeout. + a.server.Close() + return gerror.NewCode(gcode.CodeOperationFailed, "tcp server failed to start within timeout") +} + +// Post-start errors from the underlying Run() goroutine are sent to errCh +// but are not read after Start() returns successfully. A server that fails +// after startup will appear healthy while being non-functional. Callers +// needing post-start error monitoring should wrap the adapter with their +// own error notification mechanism. + +// Stop stops the TCP server. +// Since TCP has no built-in graceful shutdown concept, both graceful and forceful +// stop close the listener immediately. +func (a *tcpServerAdapter) Stop(_ bool) error { + return a.server.Close() +} + +// udpServerAdapter wraps gudp.Server to implement the Server interface. +type udpServerAdapter struct { + server *gudp.Server +} + +// NewUDPServerAdapter creates and returns a Server adapter for gudp.Server. +func NewUDPServerAdapter(server *gudp.Server) Server { + return &udpServerAdapter{server: server} +} + +// Start starts the UDP server in non-blocking way. +// Since gudp.Server.Run() is blocking, it is launched in a goroutine +// and the method polls until the connection is ready. +func (a *udpServerAdapter) Start() error { + var ( + errCh = make(chan error, 1) + ) + go func() { + if err := a.server.Run(); err != nil { + select { + case errCh <- err: + default: + } + } + }() + + // Poll until the connection is established or timeout. + deadline := time.Now().Add(adapterStartTimeout) + for time.Now().Before(deadline) { + if a.server.GetListenedPort() > 0 { + return nil + } + select { + case err := <-errCh: + return gerror.WrapCode(gcode.CodeInternalError, err, "udp server start failed") + default: + } + time.Sleep(adapterStartPollInterval) + } + + select { + case err := <-errCh: + return gerror.WrapCode(gcode.CodeInternalError, err, "udp server start failed") + default: + } + // Best-effort cleanup of the leaked goroutine and connection on timeout. + a.server.Close() + return gerror.NewCode(gcode.CodeOperationFailed, "udp server failed to start within timeout") +} + +// Stop stops the UDP server. +// Since UDP has no built-in graceful shutdown concept, both graceful and forceful +// stop close the connection immediately. +func (a *udpServerAdapter) Stop(_ bool) error { + return a.server.Close() +} diff --git a/os/gapp/gapp_watch_shutdown_internal_test.go b/os/gapp/gapp_watch_shutdown_internal_test.go new file mode 100644 index 00000000000..d72dc8da03b --- /dev/null +++ b/os/gapp/gapp_watch_shutdown_internal_test.go @@ -0,0 +1,74 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +// Tests verify waitForRunExit without blocking on gproc.Listen waitChan. +package gapp + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/gogf/gf/v2/os/gctx" + "github.com/gogf/gf/v2/test/gtest" +) + +// recorderServer captures Stop for waitForRunExit tests. +type recorderServer struct { + stopped int32 +} + +// Start succeeds immediately with no listeners. +func (r *recorderServer) Start() error { + return nil +} + +// Stop records that shutdown ran. +func (r *recorderServer) Stop(_ bool) error { + atomic.StoreInt32(&r.stopped, 1) + return nil +} + +func TestWaitForRunExitTriggersGracefulStopOnContextCancel(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + rec := &recorderServer{} + appInstance := New(rec) + + root, cancel := context.WithCancel(context.Background()) + defer cancel() + + t.AssertNil(appInstance.Boot(root)) + t.AssertNil(appInstance.Start(root)) + + var ( + stopOnce sync.Once + exitCh = make(chan struct{}) + ) + doShutdown := func() { + stopOnce.Do(func() { + t.AssertNil(appInstance.Stop(gctx.NeverDone(root), true)) + close(exitCh) + }) + } + + done := make(chan struct{}) + go func() { + appInstance.waitForRunExit(root, doShutdown, exitCh) + close(done) + }() + + cancel() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("waitForRunExit did not return after context cancellation") + } + t.Assert(atomic.LoadInt32(&rec.stopped), int32(1)) + }) +} diff --git a/os/gapp/gapp_z_unit_test.go b/os/gapp/gapp_z_unit_test.go new file mode 100644 index 00000000000..82d68ceb00f --- /dev/null +++ b/os/gapp/gapp_z_unit_test.go @@ -0,0 +1,955 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package gapp_test + +import ( + "context" + "errors" + "io" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/gogf/gf/v2/net/ghttp" + "github.com/gogf/gf/v2/net/gtcp" + "github.com/gogf/gf/v2/net/gudp" + "github.com/gogf/gf/v2/os/gapp" + "github.com/gogf/gf/v2/os/gctx" + "github.com/gogf/gf/v2/test/gtest" +) + +// mockServer is a mock Server implementation for testing. +type mockServer struct { + mu sync.Mutex + started bool + stopped bool + gracefulVal bool + startErr error + stopErr error + name string + startDelay time.Duration + stopFunc func(graceful bool) error +} + +func (m *mockServer) Start() error { + if m.startDelay > 0 { + time.Sleep(m.startDelay) + } + m.mu.Lock() + defer m.mu.Unlock() + m.started = true + return m.startErr +} + +func (m *mockServer) Stop(graceful bool) error { + m.mu.Lock() + defer m.mu.Unlock() + m.stopped = true + m.gracefulVal = graceful + if m.stopFunc != nil { + return m.stopFunc(graceful) + } + return m.stopErr +} + +func TestNew(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + app := gapp.New() + t.AssertNE(app, nil) + t.Assert(len(app.Servers()), 0) + }) +} + +func TestNewWithServers(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s1 := &mockServer{name: "s1"} + s2 := &mockServer{name: "s2"} + app := gapp.New(s1, s2) + t.Assert(len(app.Servers()), 2) + }) +} + +func TestAdd(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s1 := &mockServer{name: "s1"} + app := gapp.New(s1) + t.Assert(len(app.Servers()), 1) + + s2 := &mockServer{name: "s2"} + s3 := &mockServer{name: "s3"} + app.Add(s2, s3) + t.Assert(len(app.Servers()), 3) + }) +} + +func TestAppStartStop(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s1 := &mockServer{name: "s1"} + s2 := &mockServer{name: "s2"} + app := gapp.New(s1, s2) + + err := app.Start(context.Background()) + t.AssertNil(err) + t.Assert(s1.started, true) + t.Assert(s2.started, true) + + err = app.Stop(context.Background(), true) + t.AssertNil(err) + t.Assert(s1.stopped, true) + t.Assert(s2.stopped, true) + t.Assert(s1.gracefulVal, true) + t.Assert(s2.gracefulVal, true) + }) +} + +func TestAppStopForceful(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s1 := &mockServer{name: "s1"} + app := gapp.New(s1) + + err := app.Start(context.Background()) + t.AssertNil(err) + + err = app.Stop(context.Background(), false) + t.AssertNil(err) + t.Assert(s1.stopped, true) + t.Assert(s1.gracefulVal, false) + }) +} + +func TestAppStopReverseOrder(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var ( + mu sync.Mutex + stopSeq []string + ) + + s1 := &mockServer{name: "s1", stopFunc: func(_ bool) error { + mu.Lock() + stopSeq = append(stopSeq, "s1") + mu.Unlock() + return nil + }} + s2 := &mockServer{name: "s2", stopFunc: func(_ bool) error { + mu.Lock() + stopSeq = append(stopSeq, "s2") + mu.Unlock() + return nil + }} + s3 := &mockServer{name: "s3", stopFunc: func(_ bool) error { + mu.Lock() + stopSeq = append(stopSeq, "s3") + mu.Unlock() + return nil + }} + + app := gapp.New(s1, s2, s3) + err := app.Start(context.Background()) + t.AssertNil(err) + + err = app.Stop(context.Background(), true) + t.AssertNil(err) + + mu.Lock() + t.Assert(len(stopSeq), 3) + t.Assert(stopSeq[0], "s3") + t.Assert(stopSeq[1], "s2") + t.Assert(stopSeq[2], "s1") + mu.Unlock() + }) +} + +func TestAppStartError(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s1 := &mockServer{name: "s1", startErr: errTestStart} + app := gapp.New(s1) + + err := app.Start(context.Background()) + t.AssertNE(err, nil) + }) +} + +func TestAppStartPartialFailureRollback(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s1 := &mockServer{name: "s1"} + s2 := &mockServer{name: "s2", startErr: errTestStart} + app := gapp.New(s1, s2) + + err := app.Start(context.Background()) + t.AssertNE(err, nil) + t.Assert(s1.started, true) + t.Assert(s2.started, true) + t.Assert(s1.stopped, true) + t.Assert(s1.gracefulVal, false) + }) +} + +func TestAppStopError(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s1 := &mockServer{name: "s1", stopErr: errTestStop} + app := gapp.New(s1) + + err := app.Start(context.Background()) + t.AssertNil(err) + + err = app.Stop(context.Background(), true) + t.AssertNE(err, nil) + }) +} + +func TestAppStopEmpty(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + app := gapp.New() + err := app.Stop(context.Background(), true) + t.AssertNil(err) + }) +} + +func TestHTTPServerAdapter(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s := ghttp.GetServer("gapp-test-http") + s.SetPort(0) + s.BindHandler("/", func(r *ghttp.Request) { + r.Response.Write("ok") + }) + + adapter := gapp.NewHTTPServerAdapter(s) + err := adapter.Start() + t.AssertNil(err) + + time.Sleep(time.Millisecond * 100) + + err = adapter.Stop(true) + t.AssertNil(err) + }) +} + +func TestHTTPServerAdapterForceful(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s := ghttp.GetServer("gapp-test-http-force") + s.SetPort(0) + s.BindHandler("/", func(r *ghttp.Request) { + r.Response.Write("ok") + }) + + adapter := gapp.NewHTTPServerAdapter(s) + err := adapter.Start() + t.AssertNil(err) + + time.Sleep(time.Millisecond * 100) + + err = adapter.Stop(false) + t.AssertNil(err) + }) +} + +func TestTCPServerAdapter(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s := gtcp.NewServer(":0", func(conn *gtcp.Conn) { + defer conn.Close() + for { + data, err := conn.Recv(-1) + if err != nil { + break + } + conn.Send(data) + } + }) + + adapter := gapp.NewTCPServerAdapter(s) + err := adapter.Start() + t.AssertNil(err) + + err = adapter.Stop(true) + t.AssertNil(err) + }) +} + +func TestUDPServerAdapter(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s := gudp.NewServer(":0", func(conn *gudp.ServerConn) { + defer conn.Close() + for { + data, remote, err := conn.Recv(-1) + if err != nil { + if err != io.EOF { + break + } + break + } + if err = conn.Send(data, remote); err != nil { + break + } + } + }) + + adapter := gapp.NewUDPServerAdapter(s) + err := adapter.Start() + t.AssertNil(err) + + err = adapter.Stop(true) + t.AssertNil(err) + }) +} + +func TestAppWithHTTPServer(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s := ghttp.GetServer("gapp-test-app-http") + s.SetPort(0) + s.BindHandler("/", func(r *ghttp.Request) { + r.Response.Write("ok") + }) + + adapter := gapp.NewHTTPServerAdapter(s) + app := gapp.New(adapter) + + err := app.Start(context.Background()) + t.AssertNil(err) + + time.Sleep(time.Millisecond * 100) + + err = app.Stop(context.Background(), true) + t.AssertNil(err) + }) +} + +func TestNewOption(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var called bool + opt := gapp.NewOption(func(ctx context.Context, app *gapp.App) { + called = true + }) + app := gapp.New() + hook, err := opt.Apply(context.TODO(), app) + t.AssertNil(err) + t.Assert(called, true) + t.Assert(hook, nil) + }) +} + +func TestNewOptionWithHook(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var ( + initCalled bool + cleanupCalled bool + ) + opt := gapp.NewOptionWithHook(func(ctx context.Context, app *gapp.App) (func(ctx context.Context), error) { + initCalled = true + return func(ctx context.Context) { + cleanupCalled = true + }, nil + }) + app := gapp.New() + hook, err := opt.Apply(context.TODO(), app) + t.AssertNil(err) + t.Assert(initCalled, true) + t.AssertNE(hook, nil) + + // Call the cleanup hook. + hook(context.TODO()) + t.Assert(cleanupCalled, true) + }) +} + +func TestNewOptionWithHookNilCleanup(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var initCalled bool + opt := gapp.NewOptionWithHook(func(ctx context.Context, app *gapp.App) (func(ctx context.Context), error) { + initCalled = true + return nil, nil + }) + app := gapp.New() + hook, err := opt.Apply(context.TODO(), app) + t.AssertNil(err) + t.Assert(initCalled, true) + t.Assert(hook, nil) + }) +} + +func TestAppOption(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var called bool + app := gapp.New() + app.Option(gapp.NewOption(func(ctx context.Context, a *gapp.App) { + called = true + })) + err := app.Boot(context.TODO()) + t.AssertNil(err) + t.Assert(called, true) + }) +} + +func TestAppBoot(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var ( + mu sync.Mutex + order []string + ) + app := gapp.New() + app.Option(gapp.NewOption(func(ctx context.Context, a *gapp.App) { + mu.Lock() + order = append(order, "first") + mu.Unlock() + })) + app.Option(gapp.NewOption(func(ctx context.Context, a *gapp.App) { + mu.Lock() + order = append(order, "second") + mu.Unlock() + })) + + err := app.Boot(context.TODO()) + t.AssertNil(err) + + mu.Lock() + t.Assert(len(order), 2) + t.Assert(order[0], "first") + t.Assert(order[1], "second") + mu.Unlock() + }) +} + +func TestAppBootIdempotent(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var callCount int + app := gapp.New() + app.Option(gapp.NewOption(func(ctx context.Context, a *gapp.App) { + callCount++ + })) + + err := app.Boot(context.TODO()) + t.AssertNil(err) + t.Assert(callCount, 1) + + // Second call should be a no-op. + err = app.Boot(context.TODO()) + t.AssertNil(err) + t.Assert(callCount, 1) + }) +} + +func TestAppBootError(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + app := gapp.New() + app.Option(gapp.NewOptionWithHook(func(ctx context.Context, a *gapp.App) (func(ctx context.Context), error) { + return nil, errTestBoot + })) + + err := app.Boot(context.TODO()) + t.AssertNE(err, nil) + t.Assert(app.Booted(), false) + }) +} + +func TestAppBootRollbackOnError(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var ( + mu sync.Mutex + cleanupOrder []string + ) + app := gapp.New() + + // First option succeeds and registers a cleanup. + app.Option(gapp.NewOptionWithHook(func(ctx context.Context, a *gapp.App) (func(ctx context.Context), error) { + return func(ctx context.Context) { + mu.Lock() + cleanupOrder = append(cleanupOrder, "first") + mu.Unlock() + }, nil + })) + + // Second option fails, triggering rollback. + app.Option(gapp.NewOptionWithHook(func(ctx context.Context, a *gapp.App) (func(ctx context.Context), error) { + return nil, errTestBoot + })) + + err := app.Boot(context.TODO()) + t.AssertNE(err, nil) + + mu.Lock() + t.Assert(len(cleanupOrder), 1) + t.Assert(cleanupOrder[0], "first") + mu.Unlock() + }) +} + +func TestAppStartAutoBoot(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var bootCalled bool + app := gapp.New() + app.Option(gapp.NewOption(func(ctx context.Context, a *gapp.App) { + bootCalled = true + })) + + // Start without explicitly calling Boot. + t.Assert(app.Booted(), false) + err := app.Start(context.Background()) + t.AssertNil(err) + t.Assert(bootCalled, true) + t.Assert(app.Booted(), true) + }) +} + +func TestAppStopRunsHooks(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var ( + mu sync.Mutex + cleanupOrder []string + ) + s1 := &mockServer{name: "s1"} + app := gapp.New(s1) + + app.Option(gapp.NewOptionWithHook(func(ctx context.Context, a *gapp.App) (func(ctx context.Context), error) { + return func(ctx context.Context) { + mu.Lock() + cleanupOrder = append(cleanupOrder, "hook1") + mu.Unlock() + }, nil + })) + app.Option(gapp.NewOptionWithHook(func(ctx context.Context, a *gapp.App) (func(ctx context.Context), error) { + return func(ctx context.Context) { + mu.Lock() + cleanupOrder = append(cleanupOrder, "hook2") + mu.Unlock() + }, nil + })) + + err := app.Start(context.Background()) + t.AssertNil(err) + + err = app.Stop(context.Background(), true) + t.AssertNil(err) + + mu.Lock() + // Hooks run in reverse order. + t.Assert(len(cleanupOrder), 2) + t.Assert(cleanupOrder[0], "hook2") + t.Assert(cleanupOrder[1], "hook1") + mu.Unlock() + }) +} + +func TestMultipleOptionsOrder(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var ( + mu sync.Mutex + applyOrder []string + cleanupOrder []string + ) + app := gapp.New() + + app.Option(gapp.NewOptionWithHook(func(ctx context.Context, a *gapp.App) (func(ctx context.Context), error) { + mu.Lock() + applyOrder = append(applyOrder, "first") + mu.Unlock() + return func(ctx context.Context) { + mu.Lock() + cleanupOrder = append(cleanupOrder, "first") + mu.Unlock() + }, nil + })) + app.Option(gapp.NewOptionWithHook(func(ctx context.Context, a *gapp.App) (func(ctx context.Context), error) { + mu.Lock() + applyOrder = append(applyOrder, "second") + mu.Unlock() + return func(ctx context.Context) { + mu.Lock() + cleanupOrder = append(cleanupOrder, "second") + mu.Unlock() + }, nil + })) + app.Option(gapp.NewOptionWithHook(func(ctx context.Context, a *gapp.App) (func(ctx context.Context), error) { + mu.Lock() + applyOrder = append(applyOrder, "third") + mu.Unlock() + return func(ctx context.Context) { + mu.Lock() + cleanupOrder = append(cleanupOrder, "third") + mu.Unlock() + }, nil + })) + + s1 := &mockServer{name: "s1"} + app.Add(s1) + + err := app.Start(context.Background()) + t.AssertNil(err) + + mu.Lock() + t.Assert(len(applyOrder), 3) + t.Assert(applyOrder[0], "first") + t.Assert(applyOrder[1], "second") + t.Assert(applyOrder[2], "third") + mu.Unlock() + + err = app.Stop(context.Background(), true) + t.AssertNil(err) + + mu.Lock() + // Cleanups run in reverse order. + t.Assert(len(cleanupOrder), 3) + t.Assert(cleanupOrder[0], "third") + t.Assert(cleanupOrder[1], "second") + t.Assert(cleanupOrder[2], "first") + mu.Unlock() + }) +} + +func TestOptionCanAddServers(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + app := gapp.New() + app.Option(gapp.NewOption(func(ctx context.Context, a *gapp.App) { + s := &mockServer{name: "dynamic"} + a.Add(s) + })) + + t.Assert(len(app.Servers()), 0) + err := app.Boot(context.TODO()) + t.AssertNil(err) + t.Assert(len(app.Servers()), 1) + }) +} + +func TestBootedAccessor(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + app := gapp.New() + t.Assert(app.Booted(), false) + + err := app.Boot(context.TODO()) + t.AssertNil(err) + t.Assert(app.Booted(), true) + }) +} + +func TestAppBootHookRunsBeforeServerStop(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var ( + mu sync.Mutex + order []string + ) + s1 := &mockServer{name: "s1", stopFunc: func(_ bool) error { + mu.Lock() + order = append(order, "server") + mu.Unlock() + return nil + }} + app := gapp.New(s1) + app.Option(gapp.NewOptionWithHook(func(ctx context.Context, a *gapp.App) (func(ctx context.Context), error) { + return func(ctx context.Context) { + mu.Lock() + order = append(order, "hook") + mu.Unlock() + }, nil + })) + + err := app.Start(context.Background()) + t.AssertNil(err) + + err = app.Stop(context.Background(), true) + t.AssertNil(err) + + mu.Lock() + // Hook runs before server stop. + t.Assert(len(order), 2) + t.Assert(order[0], "hook") + t.Assert(order[1], "server") + mu.Unlock() + }) +} + +func TestAppBootConcurrent(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var applyCount int32 + app := gapp.New() + app.Option(gapp.NewOption(func(ctx context.Context, _ *gapp.App) { + atomic.AddInt32(&applyCount, 1) + time.Sleep(50 * time.Millisecond) + })) + + const callers = 20 + var wg sync.WaitGroup + wg.Add(callers) + for i := 0; i < callers; i++ { + go func() { + defer wg.Done() + t.AssertNil(app.Boot(context.Background())) + }() + } + wg.Wait() + + t.Assert(atomic.LoadInt32(&applyCount), int32(1)) + t.Assert(app.Booted(), true) + }) +} + +func TestAppBootConcurrentFailure(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var applyCount int32 + app := gapp.New() + app.Option(gapp.NewOptionWithHook(func(ctx context.Context, _ *gapp.App) (func(context.Context), error) { + atomic.AddInt32(&applyCount, 1) + time.Sleep(50 * time.Millisecond) + return nil, errTestBoot + })) + + const callers = 10 + errs := make([]error, callers) + var wg sync.WaitGroup + wg.Add(callers) + for i := 0; i < callers; i++ { + idx := i + go func() { + defer wg.Done() + errs[idx] = app.Boot(context.Background()) + }() + } + wg.Wait() + + t.Assert(atomic.LoadInt32(&applyCount), int32(1)) + t.Assert(app.Booted(), false) + for i := range errs { + t.AssertNE(errs[i], nil) + } + }) +} + +func TestAppContextPropagationBootHook(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var ( + mu sync.Mutex + bootApplyID string + stopCleanupID string + ) + + traceCtx := gctx.WithSpan(context.Background(), "test-app-span") + traceID := gctx.CtxId(traceCtx) + + app := gapp.New() + app.Option(gapp.NewOptionWithHook(func(ctx context.Context, _ *gapp.App) (func(context.Context), error) { + mu.Lock() + bootApplyID = gctx.CtxId(ctx) + mu.Unlock() + + return func(ctx context.Context) { + mu.Lock() + stopCleanupID = gctx.CtxId(ctx) + mu.Unlock() + }, nil + })) + + s1 := &mockServer{name: "s1"} + + app.Add(s1) + + err := app.Boot(traceCtx) + t.AssertNil(err) + t.Assert(bootApplyID, traceID) + + err = app.Start(traceCtx) + t.AssertNil(err) + + err = app.Stop(traceCtx, true) + t.AssertNil(err) + mu.Lock() + t.Assert(stopCleanupID, traceID) + mu.Unlock() + }) +} + +func TestAppStartRespectsContextCancel(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s1 := &mockServer{name: "s1", startDelay: 200 * time.Millisecond} + app := gapp.New(s1) + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Millisecond) + defer cancel() + + err := app.Start(ctx) + t.Assert(errors.Is(err, context.DeadlineExceeded), true) + t.Assert(s1.stopped, true) + }) +} + +func TestAppNilBootUsesNormalizedContext(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var mu sync.Mutex + gotCtx := false + app := gapp.New() + app.Option(gapp.NewOption(func(ctx context.Context, _ *gapp.App) { + mu.Lock() + gotCtx = ctx != nil + mu.Unlock() + })) + + err := app.Boot(nil) + t.AssertNil(err) + mu.Lock() + t.Assert(gotCtx, true) + mu.Unlock() + }) +} + +func TestAppLifecycleContextDefaultsAfterBoot(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var mu sync.Mutex + stopCleanupID := "" + + traceBoot := gctx.WithSpan(context.Background(), "default-lifecycle") + traceID := gctx.CtxId(traceBoot) + + app := gapp.New(&mockServer{name: "s1"}) + app.Option(gapp.NewOptionWithHook(func(ctx context.Context, _ *gapp.App) (func(context.Context), error) { + return func(ctx context.Context) { + mu.Lock() + stopCleanupID = gctx.CtxId(ctx) + mu.Unlock() + }, nil + })) + + err := app.Boot(traceBoot) + t.AssertNil(err) + + err = app.Start(context.Background()) + t.AssertNil(err) + + err = app.Stop(nil, true) + t.AssertNil(err) + + mu.Lock() + t.Assert(stopCleanupID, traceID) + mu.Unlock() + }) +} + +func TestAppRunBootFailure(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + app := gapp.New(&mockServer{name: "s1"}) + app.Option(gapp.NewOptionWithHook(func(ctx context.Context, _ *gapp.App) (func(context.Context), error) { + return nil, errTestBoot + })) + + err := app.Run(context.Background()) + t.AssertNE(err, nil) + }) +} + +func TestAppRunStartFailure(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s1 := &mockServer{name: "s1", startErr: errTestStart} + app := gapp.New(s1) + + err := app.Run(context.Background()) + t.AssertNE(err, nil) + }) +} + +func TestAppRunContextCancelGracefulShutdown(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s1 := &mockServer{name: "s1"} + app := gapp.New(s1) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { + done <- app.Run(ctx) + }() + + time.Sleep(50 * time.Millisecond) + cancel() + + select { + case err := <-done: + t.AssertNil(err) + case <-time.After(2 * time.Second): + t.Fatal("Run did not return after context cancellation") + } + t.Assert(s1.started, true) + t.Assert(s1.stopped, true) + t.Assert(s1.gracefulVal, true) + }) +} + +func TestAppRunReturnsShutdownError(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s1 := &mockServer{name: "s1", stopErr: errTestStop} + app := gapp.New(s1) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { + done <- app.Run(ctx) + }() + + time.Sleep(50 * time.Millisecond) + cancel() + + select { + case err := <-done: + t.AssertNE(err, nil) + case <-time.After(2 * time.Second): + t.Fatal("Run did not return after context cancellation") + } + t.Assert(s1.stopped, true) + }) +} + +func TestAppStopIdempotent(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var stopCount int32 + s1 := &mockServer{name: "s1", stopFunc: func(_ bool) error { + atomic.AddInt32(&stopCount, 1) + return nil + }} + app := gapp.New(s1) + + err := app.Start(context.Background()) + t.AssertNil(err) + + err = app.Stop(context.Background(), true) + t.AssertNil(err) + + err = app.Stop(context.Background(), true) + t.AssertNil(err) + t.Assert(atomic.LoadInt32(&stopCount), int32(1)) + }) +} + +func TestAppStartAfterStopSecondStopIsNoOp(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + var stopCount int32 + s1 := &mockServer{name: "s1", stopFunc: func(_ bool) error { + atomic.AddInt32(&stopCount, 1) + return nil + }} + app := gapp.New(s1) + + err := app.Start(context.Background()) + t.AssertNil(err) + + err = app.Stop(context.Background(), true) + t.AssertNil(err) + t.Assert(atomic.LoadInt32(&stopCount), int32(1)) + + err = app.Start(context.Background()) + t.AssertNil(err) + + err = app.Stop(context.Background(), true) + t.AssertNil(err) + t.Assert(atomic.LoadInt32(&stopCount), int32(1)) + }) +} + +var ( + errTestStart = newTestError("start failed") + errTestStop = newTestError("stop failed") + errTestBoot = newTestError("boot failed") +) + +type testError string + +func newTestError(msg string) error { return testError(msg) } +func (e testError) Error() string { return string(e) } diff --git a/os/gproc/gproc_signal.go b/os/gproc/gproc_signal.go index 30efc61f371..245337bc3e8 100644 --- a/os/gproc/gproc_signal.go +++ b/os/gproc/gproc_signal.go @@ -70,12 +70,17 @@ func AddSigHandlerShutdown(handler ...SigHandler) { notifySignals() } -// Listen blocks and does signal listening and handling. -func Listen() { +// StartListen starts signal listening in the background without blocking. +// It is safe to call multiple times; the listener goroutine is started at most once. +func StartListen() { listenOnce.Do(func() { go listen() }) +} +// Listen blocks and does signal listening and handling. +func Listen() { + StartListen() <-waitChan }