Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions contrib/rpc/grpcx/grpcx_gapp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.

// Package grpcx provides gapp Server adapter for GrpcServer.
package grpcx

import (
"time"

"github.com/gogf/gf/v2/errors/gcode"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/os/gapp"
)

// adapterStartTimeout is the maximum duration to wait for a gRPC server
// to indicate it is listening after Start() is called.
const grpcAdapterStartTimeout = time.Second * 2

// adapterStartPollInterval is the interval between readiness polls.
const grpcAdapterStartPollInterval = time.Millisecond * 10

// GrpcServerAdapter wraps GrpcServer to implement the gapp.Server interface.
type GrpcServerAdapter struct {
server *GrpcServer
}

// NewGappServerAdapter creates and returns a gapp.Server adapter for GrpcServer.
func NewGappServerAdapter(server *GrpcServer) gapp.Server {
return &GrpcServerAdapter{server: server}
}

// Start starts the gRPC server in non-blocking way without registering signal handlers.
func (a *GrpcServerAdapter) Start() error {
if err := a.server.StartManaged(); err != nil {
return gerror.WrapCode(gcode.CodeInternalError, err, "grpc server start failed")
}

// Poll until the listener is established or timeout.
deadline := time.Now().Add(grpcAdapterStartTimeout)
for time.Now().Before(deadline) {
if a.server.GetListenedPort() > 0 {
return nil
}
time.Sleep(grpcAdapterStartPollInterval)
}
// serve() succeeded but we cannot confirm readiness; clean up the running server.
a.server.StopForceful()
return gerror.NewCode(gcode.CodeOperationFailed, "grpc server failed to start within timeout")
}

// Stop stops the gRPC server.
// When graceful is true, it waits for in-flight RPCs to complete.
// When graceful is false, it forcibly stops the server immediately.
// Note: GrpcServer.Stop/StopForceful do not return errors, so this
// method always returns nil.
func (a *GrpcServerAdapter) Stop(graceful bool) error {
if graceful {
a.server.Stop()
} else {
a.server.StopForceful()
}
return nil
}
82 changes: 82 additions & 0 deletions contrib/rpc/grpcx/grpcx_gapp_z_unit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.

// Unit tests for gapp.Server adapter wrapping GrpcServer.
package grpcx_test

import (
"testing"

"github.com/gogf/gf/v2/os/gapp"
"github.com/gogf/gf/v2/test/gtest"
"github.com/gogf/gf/v2/util/guid"

"github.com/gogf/gf/contrib/rpc/grpcx/v2"
"github.com/gogf/gf/contrib/rpc/grpcx/v2/testdata/controller"
)

func Test_Grpcx_GappServerAdapter_ImplementsGappServer(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
c := grpcx.Server.NewConfig()
c.Name = guid.S()
s := grpcx.Server.New(c)
var _ gapp.Server = grpcx.NewGappServerAdapter(s)
})
}

func Test_Grpcx_StartManagedDoesNotRegisterSignalHandler(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
c := grpcx.Server.NewConfig()
c.Name = guid.S()
s := grpcx.Server.New(c)
controller.Register(s)

err := s.StartManaged()
t.AssertNil(err)
t.Assert(s.GetListenedPort() > 0, true)

err = s.StartManaged()
t.AssertNE(err, nil)

s.StopForceful()
})
}

func Test_Grpcx_GappServerAdapter_StartGracefulStop(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
c := grpcx.Server.NewConfig()
c.Name = guid.S()
s := grpcx.Server.New(c)
controller.Register(s)

ad := grpcx.NewGappServerAdapter(s)

err := ad.Start()
t.AssertNil(err)
t.Assert(s.GetListenedPort() > 0, true)

err = ad.Stop(true)
t.AssertNil(err)
})
}

func Test_Grpcx_GappServerAdapter_StopForceful(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
c := grpcx.Server.NewConfig()
c.Name = guid.S()
s := grpcx.Server.New(c)
controller.Register(s)

ad := grpcx.NewGappServerAdapter(s)

err := ad.Start()
t.AssertNil(err)
t.Assert(s.GetListenedPort() > 0, true)

err = ad.Stop(false)
t.AssertNil(err)
})
}
72 changes: 49 additions & 23 deletions contrib/rpc/grpcx/grpcx_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ import (

// GrpcServer is the server for GRPC protocol.
type GrpcServer struct {
Server *grpc.Server
config *GrpcServerConfig
listener net.Listener
services []gsvc.Service
waitGroup sync.WaitGroup
registrar gsvc.Registrar
serviceMu sync.Mutex
Server *grpc.Server
config *GrpcServerConfig
listenerMu sync.RWMutex
listener net.Listener
services []gsvc.Service
waitGroup sync.WaitGroup
registrar gsvc.Registrar
serviceMu sync.Mutex
}

// Service implements gsvc.Service interface.
Expand Down Expand Up @@ -95,35 +96,52 @@ func (s *GrpcServer) Service(services ...gsvc.Service) {
s.services = append(s.services, services...)
}

// Run starts the server in blocking way.
func (s *GrpcServer) Run() {
var (
err error
ctx = gctx.GetInitCtx()
)
// Create listener to bind listening ip and port.
s.listener, err = net.Listen("tcp", s.config.Address)
// serve binds the listener, starts serving asynchronously, and registers services.
// It does not block on OS signal handling and is intended for external lifecycle managers.
func (s *GrpcServer) serve() error {
ctx := gctx.GetInitCtx()

s.listenerMu.Lock()
if s.listener != nil {
s.listenerMu.Unlock()
return gerror.NewCode(gcode.CodeInvalidOperation, "grpc server already started")
}
listener, err := net.Listen("tcp", s.config.Address)
if err != nil {
s.Logger().Fatalf(ctx, `%+v`, err)
s.listenerMu.Unlock()
return err
}
s.listener = listener
s.listenerMu.Unlock()

// Start listening.
go s.doServeAsynchronously(ctx)

// Service register.
go s.doServeAsynchronously(ctx, listener)
s.doServiceRegister()
s.Logger().Infof(
ctx,
"pid[%d]: grpc server started listening on [%s]",
gproc.Pid(), s.GetListenedAddress(),
)
s.doSignalListen()
return nil
}

func (s *GrpcServer) doServeAsynchronously(ctx context.Context) {
if err := s.Server.Serve(s.listener); err != nil {
// Run starts the server in blocking way.
func (s *GrpcServer) Run() {
ctx := gctx.GetInitCtx()
if err := s.serve(); err != nil {
s.Logger().Fatalf(ctx, `%+v`, err)
}
s.doSignalListen()
}

// StartManaged starts serving under external lifecycle management without signal handling.
func (s *GrpcServer) StartManaged() error {
return s.serve()
}

func (s *GrpcServer) doServeAsynchronously(ctx context.Context, listener net.Listener) {
if err := s.Server.Serve(listener); err != nil && err != grpc.ErrServerStopped {
s.Logger().Errorf(ctx, `grpc server serve error: %+v`, err)
}
}

// doSignalListen does signal listening and handling for gracefully shutdown.
Expand Down Expand Up @@ -225,6 +243,12 @@ func (s *GrpcServer) Stop() {
s.Server.GracefulStop()
}

// StopForceful forcibly stops the server and de-registers services from the registry.
func (s *GrpcServer) StopForceful() {
s.doServiceDeregister()
s.Server.Stop()
}

// GetConfig returns the configuration of current Server.
func (s *GrpcServer) GetConfig() *GrpcServerConfig {
return s.config
Expand All @@ -245,6 +269,8 @@ func (s *GrpcServer) GetListenedAddress() string {

// GetListenedPort retrieves and returns one port which is listened to by current server.
func (s *GrpcServer) GetListenedPort() int {
s.listenerMu.RLock()
defer s.listenerMu.RUnlock()
if ln := s.listener; ln != nil {
return ln.Addr().(*net.TCPAddr).Port
}
Expand Down
Loading
Loading