Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions examples/graceful-stop/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Graceful Stop Demo

This example demonstrates the intended shutdown behavior after the gRPC graceful-stop patch.

## Run

```bash
go run ./examples/graceful-stop
```

## Expected behavior

- one long-running RPC starts
- shutdown begins while that RPC is still running
- new RPCs stop being accepted shortly after shutdown starts
- the in-flight RPC is allowed to finish

Typical output:

```text
long RPC is running; starting shutdown
new RPC rejected after shutdown began: ...
long RPC completed: slept for 1500ms
done
```

There may be a small race window where the first post-stop RPC is still accepted once before subsequent new RPCs are rejected. The important part is that in-flight RPCs are drained while new RPCs are cut off.

## Automated check

```bash
go test ./server/grpc -run TestGracefulStopRejectsNewRPCsButAllowsInFlightRPCs -v
```

## Environment

- no special environment variables are required
- the demo may print a TLS warning from `go-micro`; it is unrelated to this change
129 changes: 129 additions & 0 deletions examples/graceful-stop/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package main

import (
"context"
"fmt"
"log"
"net"
"time"

micro "go-micro.dev/v5"
"go-micro.dev/v5/client"
grpcclient "go-micro.dev/v5/client/grpc"
"go-micro.dev/v5/registry"
"go-micro.dev/v5/server"
grpcserver "go-micro.dev/v5/server/grpc"
)

type SleepRequest struct {
DelayMS int `json:"delay_ms"`
}

type SleepResponse struct {
Message string `json:"message"`
}

type Sleeper struct {
started chan struct{}
}

func (s *Sleeper) Sleep(ctx context.Context, req *SleepRequest, rsp *SleepResponse) error {
select {
case s.started <- struct{}{}:
default:
}

timer := time.NewTimer(time.Duration(req.DelayMS) * time.Millisecond)
defer timer.Stop()

select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
}

rsp.Message = fmt.Sprintf("slept for %dms", req.DelayMS)
return nil
}

func main() {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
log.Fatal(err)
}
defer listener.Close()

addr := listener.Addr().String()
reg := registry.NewMemoryRegistry()
handler := &Sleeper{started: make(chan struct{}, 1)}

service := micro.New("grace-demo",
micro.HandleSignal(false),
micro.Registry(reg),
micro.Server(grpcserver.NewServer(
server.Registry(reg),
server.Name("grace-demo"),
server.Address(addr),
grpcserver.Listener(listener),
grpcserver.GracefulStopTimeout(3*time.Second),
)),
micro.Client(grpcclient.NewClient(
client.Registry(reg),
client.ContentType("application/grpc+json"),
client.DialTimeout(200*time.Millisecond),
client.RequestTimeout(5*time.Second),
)),
)

if err := service.Handle(handler); err != nil {
log.Fatal(err)
}
if err := service.Start(); err != nil {
log.Fatal(err)
}

log.Printf("service started on %s", addr)

longDone := make(chan error, 1)
go func() {
req := service.Client().NewRequest("grace-demo", "Sleeper.Sleep", &SleepRequest{DelayMS: 1500})
rsp := &SleepResponse{}
longDone <- service.Client().Call(context.Background(), req, rsp, client.WithAddress(addr))
if rsp.Message != "" {
log.Printf("long RPC completed: %s", rsp.Message)
}
}()

<-handler.started
log.Printf("long RPC is running; starting shutdown")

stopDone := make(chan error, 1)
go func() {
stopDone <- service.Stop()
}()

deadline := time.Now().Add(500 * time.Millisecond)
for time.Now().Before(deadline) {
callCtx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond)
req := service.Client().NewRequest("grace-demo", "Sleeper.Sleep", &SleepRequest{DelayMS: 50})
rsp := &SleepResponse{}
err = service.Client().Call(callCtx, req, rsp, client.WithAddress(addr))
cancel()
if err != nil {
log.Printf("new RPC rejected after shutdown began: %v", err)
break
}

log.Printf("new RPC still accepted during shutdown: %s", rsp.Message)
time.Sleep(10 * time.Millisecond)
}

if err := <-longDone; err != nil {
log.Fatalf("long RPC failed: %v", err)
}
if err := <-stopDone; err != nil {
log.Fatalf("service stop failed: %v", err)
}

log.Printf("done")
}
146 changes: 146 additions & 0 deletions server/grpc/graceful_stop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package grpc

import (
"context"
"net"
"testing"
"time"

micro "go-micro.dev/v5"
"go-micro.dev/v5/client"
grpcclient "go-micro.dev/v5/client/grpc"
"go-micro.dev/v5/registry"
"go-micro.dev/v5/server"
)

type SleepRequest struct {
DelayMS int `json:"delay_ms"`
}

type SleepResponse struct {
Message string `json:"message"`
}

type SleepHandler struct {
started chan struct{}
}

func (h *SleepHandler) Sleep(ctx context.Context, req *SleepRequest, rsp *SleepResponse) error {
select {
case h.started <- struct{}{}:
default:
}

timer := time.NewTimer(time.Duration(req.DelayMS) * time.Millisecond)
defer timer.Stop()

select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
}

rsp.Message = "ok"
return nil
}

func TestGracefulStopRejectsNewRPCsButAllowsInFlightRPCs(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen: %v", err)
}
defer listener.Close()

addr := listener.Addr().String()
reg := registry.NewMemoryRegistry()
handler := &SleepHandler{started: make(chan struct{}, 1)}

svc := micro.New("grace-demo",
micro.HandleSignal(false),
micro.Registry(reg),
micro.Server(NewServer(
server.Registry(reg),
server.Name("grace-demo"),
server.Address(addr),
Listener(listener),
GracefulStopTimeout(3*time.Second),
)),
micro.Client(grpcclient.NewClient(
client.Registry(reg),
client.ContentType("application/grpc+json"),
client.DialTimeout(200*time.Millisecond),
client.RequestTimeout(5*time.Second),
)),
)

if err := svc.Handle(handler); err != nil {
t.Fatalf("handle: %v", err)
}
if err := svc.Start(); err != nil {
t.Fatalf("start: %v", err)
}

stopped := false
defer func() {
if !stopped {
_ = svc.Stop()
}
}()

longDone := make(chan error, 1)
go func() {
req := svc.Client().NewRequest("grace-demo", "SleepHandler.Sleep", &SleepRequest{DelayMS: 1000})
rsp := &SleepResponse{}
longDone <- svc.Client().Call(context.Background(), req, rsp, client.WithAddress(addr))
}()

select {
case <-handler.started:
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for long RPC to start")
}

stopDone := make(chan error, 1)
go func() {
stopDone <- svc.Stop()
}()

freshReq := svc.Client().NewRequest("grace-demo", "SleepHandler.Sleep", &SleepRequest{DelayMS: 10})
freshRsp := &SleepResponse{}
var rejectErr error

deadline := time.Now().Add(300 * time.Millisecond)
for time.Now().Before(deadline) {
callCtx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond)
err := svc.Client().Call(callCtx, freshReq, freshRsp, client.WithAddress(addr))
cancel()
if err != nil {
rejectErr = err
break
}
time.Sleep(10 * time.Millisecond)
}

if rejectErr == nil {
t.Fatal("expected a new RPC to be rejected shortly after shutdown started")
}

select {
case err := <-longDone:
if err != nil {
t.Fatalf("long RPC failed during graceful stop: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for in-flight RPC to finish")
}

select {
case err := <-stopDone:
if err != nil {
t.Fatalf("stop: %v", err)
}
stopped = true
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for server stop")
}
}
41 changes: 29 additions & 12 deletions server/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (
"go-micro.dev/v5/broker"
"go-micro.dev/v5/cmd"
"go-micro.dev/v5/errors"
"go-micro.dev/v5/logger"
meta "go-micro.dev/v5/metadata"
"go-micro.dev/v5/registry"
"go-micro.dev/v5/server"
"go-micro.dev/v5/internal/util/addr"
"go-micro.dev/v5/internal/util/backoff"
mgrpc "go-micro.dev/v5/internal/util/grpc"
mnet "go-micro.dev/v5/internal/util/net"
"go-micro.dev/v5/logger"
meta "go-micro.dev/v5/metadata"
"go-micro.dev/v5/registry"
"go-micro.dev/v5/server"
"golang.org/x/net/netutil"

"google.golang.org/grpc"
Expand Down Expand Up @@ -213,6 +213,19 @@ func (g *grpcServer) getGrpcServer() *grpc.Server {
return nil
}

func (g *grpcServer) getGracefulStopTimeout() time.Duration {
if g.opts.Context == nil {
return time.Second
}

timeout, ok := g.opts.Context.Value(gracefulStopTimeoutKey{}).(time.Duration)
if !ok || timeout <= 0 {
return time.Second
}

return timeout
}

func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error {
if g.wg != nil {
g.wg.Add(1)
Expand Down Expand Up @@ -970,23 +983,27 @@ func (g *grpcServer) Start() error {
log.Log(logger.ErrorLevel, "Server deregister error: ", err)
}

// wait for waitgroup
if g.wg != nil {
g.wg.Wait()
}

// stop the grpc server
exit := make(chan bool)
gracefulStopTimeout := g.getGracefulStopTimeout()
exit := make(chan struct{})

go func() {
g.srv.GracefulStop()
close(exit)
}()

timer := time.NewTimer(gracefulStopTimeout)
defer timer.Stop()

select {
case <-exit:
case <-time.After(time.Second):
case <-timer.C:
log.Logf(logger.ErrorLevel, "gRPC Server graceful stop timed out after %s, forcing stop", gracefulStopTimeout)
g.srv.Stop()
<-exit
}

if g.wg != nil {
g.wg.Wait()
}

log.Logf(logger.InfoLevel, "Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
Expand Down
Loading