diff --git a/consumer/consumer.go b/consumer/consumer.go index 4ed49400..8e094f22 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -892,8 +892,7 @@ func (dc *defaultConsumer) pullInner(ctx context.Context, queue *primitive.Messa } // TODO: add computPullFromWhichFilterServer - - return dc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest) + return dc.client.PullMessage(ctx, brokerResult.BrokerAddr, pullRequest) } func (dc *defaultConsumer) processPullResult(mq *primitive.MessageQueue, result *primitive.PullResult, data *internal.SubscriptionData) { diff --git a/internal/remote/future.go b/internal/remote/future.go index a7d32684..5822783d 100644 --- a/internal/remote/future.go +++ b/internal/remote/future.go @@ -19,7 +19,8 @@ package remote import ( "context" - "github.com/apache/rocketmq-client-go/v2/errors" + "errors" + e2 "github.com/apache/rocketmq-client-go/v2/errors" "sync" ) @@ -61,7 +62,10 @@ func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) { case <-r.Done: cmd, err = r.ResponseCommand, r.Err case <-r.ctx.Done(): - err = errors.ErrRequestTimeout + err = r.ctx.Err() + if errors.Is(err, context.DeadlineExceeded) { + err = e2.ErrRequestTimeout + } r.Err = err } return cmd, err