Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (c *Client) receiveLoop() error {

// createStream creates a new stream and registers it with the client
// Introduce stream types for multiple or single response
func (c *Client) createStream(flags uint8, b []byte) (*stream, error) {
func (c *Client) createStream(flags uint8, b []byte, recvBuf int) (*stream, error) {
// sendLock must be held across both allocation of the stream ID and sending it across the wire.
// This ensures that new stream IDs sent on the wire are always increasing, which is a
// requirement of the TTRPC protocol.
Expand Down Expand Up @@ -417,7 +417,7 @@ func (c *Client) createStream(flags uint8, b []byte) (*stream, error) {
default:
}

s = newStream(c.nextStreamID, c)
s = newStream(c.nextStreamID, c, recvBuf)
c.streams[s.id] = s
c.nextStreamID = c.nextStreamID + 2

Expand Down Expand Up @@ -517,7 +517,7 @@ func (c *Client) NewStream(ctx context.Context, desc *StreamDesc, service, metho
} else {
flags = flagRemoteClosed
}
s, err := c.createStream(flags, p)
s, err := c.createStream(flags, p, streamRecvBufferSize)
if err != nil {
return nil, err
}
Expand All @@ -536,7 +536,7 @@ func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) err
return err
}

s, err := c.createStream(0, p)
s, err := c.createStream(0, p, 1)
if err != nil {
return err
}
Expand Down
6 changes: 6 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ var (

// ErrStreamClosed is when the streaming connection is closed.
ErrStreamClosed = errors.New("ttrpc: stream closed")

// ErrStreamFull is returned when a stream's receive buffer is full
// and the message cannot be delivered without blocking the
// connection's receive loop. This prevents a single unconsumed
// stream from deadlocking all other streams on the same connection.
ErrStreamFull = errors.New("ttrpc: stream buffer full")
)

// OversizedMessageErr is used to indicate refusal to send an oversized message.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/containerd/ttrpc

go 1.22
go 1.23

require (
github.com/containerd/log v0.1.0
Expand Down
24 changes: 23 additions & 1 deletion services.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"io"
"os"
"path"
"time"
"unsafe"

"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -128,10 +129,14 @@ func (s *serviceSet) handle(ctx context.Context, req *Request, respond func(*sta
StreamingClient: stream.StreamingClient,
StreamingServer: stream.StreamingServer,
}
recvBuf := streamRecvBufferSize
if !stream.StreamingClient {
recvBuf = 1
}
sh := &streamHandler{
ctx: ctx,
respond: respond,
recv: make(chan Unmarshaler, 5),
recv: make(chan Unmarshaler, recvBuf),
info: info,
}
go func() {
Expand All @@ -158,6 +163,12 @@ func (s *serviceSet) handle(ctx context.Context, req *Request, respond func(*sta
return nil, status.Errorf(codes.Unimplemented, "method %v", req.Method)
}

// streamRecvBufferSize is the buffer size for stream recv channels. It
// should be large enough to absorb normal bursts without hitting the
// 1-second timeout fallback in receive/data, but small enough that
// per-stream memory overhead stays trivial.
const streamRecvBufferSize = 64

type streamHandler struct {
ctx context.Context
respond func(*status.Status, []byte, bool, bool) error
Expand All @@ -184,6 +195,17 @@ func (s *streamHandler) data(unmarshal Unmarshaler) error {
return nil
case <-s.ctx.Done():
return s.ctx.Err()
default:
// If recv channel is full, wait up to a second for an item
// to drain and unblock, otherwise return an error.
select {
case s.recv <- unmarshal:
return nil
case <-s.ctx.Done():
return s.ctx.Err()
case <-time.After(time.Second):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timer leak pre-Go 1.23.

return ErrStreamFull
}
}
}

Expand Down
24 changes: 22 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package ttrpc
import (
"context"
"sync"
"time"
)

type streamID uint32
Expand All @@ -38,11 +39,11 @@ type stream struct {
recvClose chan struct{}
}

func newStream(id streamID, send sender) *stream {
func newStream(id streamID, send sender, recvBuf int) *stream {
return &stream{
id: id,
sender: send,
recv: make(chan *streamMessage, 1),
recv: make(chan *streamMessage, recvBuf),
recvClose: make(chan struct{}),
}
}
Expand All @@ -63,6 +64,11 @@ func (s *stream) send(mt messageType, flags uint8, b []byte) error {
return s.sender.send(uint32(s.id), mt, flags, b)
}

// receive delivers a message to this stream from the connection receive loop.
// If the stream's recv buffer is full, it waits up to 1 second for the
// consumer to make progress. This keeps the receive loop moving for other
// streams while still providing backpressure under normal operation. If the
// timeout expires the stream is closed with ErrStreamFull.
func (s *stream) receive(ctx context.Context, msg *streamMessage) error {
select {
case <-s.recvClose:
Expand All @@ -76,6 +82,20 @@ func (s *stream) receive(ctx context.Context, msg *streamMessage) error {
return nil
case <-ctx.Done():
return ctx.Err()
default:
// If recv channel is full, wait up to a second for an item
// to drain and unblock, otherwise close the stream.
select {
case <-s.recvClose:
return s.recvErr
case s.recv <- msg:
return nil
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

s.closeWithError(ErrStreamFull)
return ErrStreamFull
}
}
}

Expand Down
Loading
Loading