diff --git a/go.mod b/go.mod index bbd07c7d0..43204a265 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/panjf2000/ants/v2 v2.12.0 github.com/pkg/errors v0.9.1 github.com/pkg/profile v1.7.0 - github.com/rabbitmq/amqp091-go v1.10.0 + github.com/rabbitmq/amqp091-go v1.11.0 github.com/rs/zerolog v1.35.0 github.com/russross/blackfriday/v2 v2.1.0 github.com/sirupsen/logrus v1.9.4 diff --git a/go.sum b/go.sum index d3f4c9969..82f385686 100644 --- a/go.sum +++ b/go.sum @@ -323,8 +323,8 @@ github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9Z github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= github.com/prometheus/procfs v0.17.0 h1:FuLQ+05u4ZI+SS/w9+BWEM2TXiHKsUQ9TADiRH7DuK0= github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUOVhe0wYB2zw= -github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= -github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +github.com/rabbitmq/amqp091-go v1.11.0 h1:HxIctVm9Gid/Vtn706necmZ7Wj6pgGI2eqplRbEY8O8= +github.com/rabbitmq/amqp091-go v1.11.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= diff --git a/vendor/github.com/rabbitmq/amqp091-go/.golangci.yml b/vendor/github.com/rabbitmq/amqp091-go/.golangci.yml index 4341bcf98..4f25c6a22 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/.golangci.yml +++ b/vendor/github.com/rabbitmq/amqp091-go/.golangci.yml @@ -1,3 +1,42 @@ +version: "2" run: build-tags: - integration +linters: + default: standard + enable: + - modernize + disable: + - testpackage + - testableexamples + - godox + - gochecknoinits + settings: + modernize: + disable: + # enable when + - minmax + exclusions: + generated: lax + presets: + - comments + - common-false-positives + - legacy + - std-error-handling + paths: + - third_party$ + - builtin$ + - examples$ + rules: + - linters: + - funlen + - gocognit + - cyclop + source: "integration_test.go" +formatters: + exclusions: + generated: lax + paths: + - third_party$ + - builtin$ + - examples$ diff --git a/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md b/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md index fd03c1f9b..2f8aa2328 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md +++ b/vendor/github.com/rabbitmq/amqp091-go/CHANGELOG.md @@ -1,5 +1,57 @@ # Changelog +## [v1.11.0](https://github.com/rabbitmq/amqp091-go/tree/v1.11.0) (2026-04-21) + +[Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.10.0...v1.11.0) + +**Implemented enhancements:** + +- add better debug information on DialConfig [\#245](https://github.com/rabbitmq/amqp091-go/issues/245) + +**Fixed bugs:** + +- Channel error when acking via go-routines [\#296](https://github.com/rabbitmq/amqp091-go/issues/296) + +**Closed issues:** + +- PR \#318 exposes a pre-existing race in `Connection.Close()`. [\#327](https://github.com/rabbitmq/amqp091-go/issues/327) +- Entire header frame isn't always read [\#309](https://github.com/rabbitmq/amqp091-go/issues/309) +- Incomplete support of 0-9-1 field type values [\#302](https://github.com/rabbitmq/amqp091-go/issues/302) +- Redelivered Flag Not Exposed [\#301](https://github.com/rabbitmq/amqp091-go/issues/301) +- consume input basicConsumeOk but response queueBindOk [\#291](https://github.com/rabbitmq/amqp091-go/issues/291) +- Channel is closed after Channel.ExchangeDeclarePassive fails [\#290](https://github.com/rabbitmq/amqp091-go/issues/290) +- Incomplete example in \(\*Channel\).QueueBind documentation [\#279](https://github.com/rabbitmq/amqp091-go/issues/279) +- QueueDeclarePassive does not report queue type mismatch [\#273](https://github.com/rabbitmq/amqp091-go/issues/273) +- Release 1.10.0 [\#261](https://github.com/rabbitmq/amqp091-go/issues/261) +- Update minimum Go version to 1.18 [\#146](https://github.com/rabbitmq/amqp091-go/issues/146) + +**Merged pull requests:** + +- fix: respect context cancellation on publishing with context operations [\#330](https://github.com/rabbitmq/amqp091-go/pull/330) ([NawafSwe](https://github.com/NawafSwe)) +- Eliminate race condition in Connection.Close\(\) and related methods [\#328](https://github.com/rabbitmq/amqp091-go/pull/328) ([Zerpet](https://github.com/Zerpet)) +- Bump the github-actions group with 4 updates [\#326](https://github.com/rabbitmq/amqp091-go/pull/326) ([dependabot[bot]](https://github.com/apps/dependabot)) +- Bump github/codeql-action from 3 to 4 [\#321](https://github.com/rabbitmq/amqp091-go/pull/321) ([dependabot[bot]](https://github.com/apps/dependabot)) +- Fix incomplete routing diagram in QueueBind doc comment [\#320](https://github.com/rabbitmq/amqp091-go/pull/320) ([Copilot](https://github.com/apps/copilot-swe-agent)) +- Use RabbitMQ 4 in Makefile [\#319](https://github.com/rabbitmq/amqp091-go/pull/319) ([Zerpet](https://github.com/Zerpet)) +- refactor: simplify with atomic types [\#318](https://github.com/rabbitmq/amqp091-go/pull/318) ([alexandear](https://github.com/alexandear)) +- Add support for unsigned type values [\#317](https://github.com/rabbitmq/amqp091-go/pull/317) ([Zerpet](https://github.com/Zerpet)) +- fix: modernize lint issues [\#315](https://github.com/rabbitmq/amqp091-go/pull/315) ([alexandear](https://github.com/alexandear)) +- Fix `parseHeaderFrame` to consume entire frame payload [\#314](https://github.com/rabbitmq/amqp091-go/pull/314) ([lukebakken](https://github.com/lukebakken)) +- docs: update link to RabbitMQ tutorials [\#313](https://github.com/rabbitmq/amqp091-go/pull/313) ([alexandear](https://github.com/alexandear)) +- fix: typos in comments and tests [\#312](https://github.com/rabbitmq/amqp091-go/pull/312) ([alexandear](https://github.com/alexandear)) +- feat: add MIME types constants for content types [\#308](https://github.com/rabbitmq/amqp091-go/pull/308) ([YlanzinhoY](https://github.com/YlanzinhoY)) +- Fix linter error after migrating config to v2 [\#306](https://github.com/rabbitmq/amqp091-go/pull/306) ([Zerpet](https://github.com/Zerpet)) +- Investigate GH-296 [\#297](https://github.com/rabbitmq/amqp091-go/pull/297) ([lukebakken](https://github.com/lukebakken)) +- Return existing error instead of creating new for the same purpose [\#295](https://github.com/rabbitmq/amqp091-go/pull/295) ([pingvincible](https://github.com/pingvincible)) +- Add warning about concurrency with Channels [\#294](https://github.com/rabbitmq/amqp091-go/pull/294) ([Zerpet](https://github.com/Zerpet)) +- Expose delivery not initialised error [\#293](https://github.com/rabbitmq/amqp091-go/pull/293) ([Zerpet](https://github.com/Zerpet)) +- fix: unify receiver methods to avoid conflicts between value and pointer types [\#292](https://github.com/rabbitmq/amqp091-go/pull/292) ([Raisul191491](https://github.com/Raisul191491)) +- Fixing simple errors [\#280](https://github.com/rabbitmq/amqp091-go/pull/280) ([korolev-d-l](https://github.com/korolev-d-l)) +- Add test that demonstrates the issue [\#274](https://github.com/rabbitmq/amqp091-go/pull/274) ([lukebakken](https://github.com/lukebakken)) +- chore: doc typo [\#269](https://github.com/rabbitmq/amqp091-go/pull/269) ([AndrewWinterman](https://github.com/AndrewWinterman)) +- Small fixes and refactors [\#266](https://github.com/rabbitmq/amqp091-go/pull/266) ([peczenyj](https://github.com/peczenyj)) +- add methods Temporary and Recoverable to amqp.Error [\#265](https://github.com/rabbitmq/amqp091-go/pull/265) ([peczenyj](https://github.com/peczenyj)) + ## [v1.10.0](https://github.com/rabbitmq/amqp091-go/tree/v1.10.0) (2024-05-08) [Full Changelog](https://github.com/rabbitmq/amqp091-go/compare/v1.9.0...v1.10.0) diff --git a/vendor/github.com/rabbitmq/amqp091-go/Makefile b/vendor/github.com/rabbitmq/amqp091-go/Makefile index 7dc71bc5f..8b25fee7e 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/Makefile +++ b/vendor/github.com/rabbitmq/amqp091-go/Makefile @@ -34,7 +34,7 @@ CONTAINER_NAME ?= amqp091-go-rabbitmq rabbitmq-server: ## Start a RabbitMQ server using Docker. Container name can be customised with CONTAINER_NAME=some-rabbit docker run --detach --rm --name $(CONTAINER_NAME) \ --publish 5672:5672 --publish 15672:15672 \ - --pull always rabbitmq:3-management + --pull always rabbitmq:4-management .PHONY: stop-rabbitmq-server stop-rabbitmq-server: ## Stop a RabbitMQ server using Docker. Container name can be customised with CONTAINER_NAME=some-rabbit diff --git a/vendor/github.com/rabbitmq/amqp091-go/README.md b/vendor/github.com/rabbitmq/amqp091-go/README.md index 6d3143f67..1214ddb19 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/README.md +++ b/vendor/github.com/rabbitmq/amqp091-go/README.md @@ -90,7 +90,7 @@ please file an issue. ## Documentation * [Godoc API reference](http://godoc.org/github.com/rabbitmq/amqp091-go) - * [RabbitMQ tutorials in Go](https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/go) + * [RabbitMQ tutorials in Go](https://github.com/rabbitmq/rabbitmq-tutorials/tree/main/go) ## Contributing diff --git a/vendor/github.com/rabbitmq/amqp091-go/allocator.go b/vendor/github.com/rabbitmq/amqp091-go/allocator.go index f2925e742..e4aa90081 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/allocator.go +++ b/vendor/github.com/rabbitmq/amqp091-go/allocator.go @@ -6,9 +6,9 @@ package amqp091 import ( - "bytes" "fmt" "math/big" + "strings" ) const ( @@ -42,9 +42,9 @@ func newAllocator(low, high int) *allocator { // "allocator[low..high] reserved..until" // // O(N) where N is high-low -func (a allocator) String() string { - b := &bytes.Buffer{} - fmt.Fprintf(b, "allocator[%d..%d]", a.low, a.high) +func (a *allocator) String() string { + var b strings.Builder + fmt.Fprintf(&b, "allocator[%d..%d]", a.low, a.high) for low := a.low; low <= a.high; low++ { high := low @@ -53,9 +53,9 @@ func (a allocator) String() string { } if high > low+1 { - fmt.Fprintf(b, " %d..%d", low, high-1) + fmt.Fprintf(&b, " %d..%d", low, high-1) } else if high > low { - fmt.Fprintf(b, " %d", high-1) + fmt.Fprintf(&b, " %d", high-1) } low = high diff --git a/vendor/github.com/rabbitmq/amqp091-go/auth.go b/vendor/github.com/rabbitmq/amqp091-go/auth.go index 0c07bb3ec..6df30f7f8 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/auth.go +++ b/vendor/github.com/rabbitmq/amqp091-go/auth.go @@ -55,8 +55,7 @@ func (auth *AMQPlainAuth) Response() string { } // ExternalAuth for RabbitMQ-auth-mechanism-ssl. -type ExternalAuth struct { -} +type ExternalAuth struct{} // Mechanism returns "EXTERNAL" func (*ExternalAuth) Mechanism() string { @@ -70,7 +69,6 @@ func (*ExternalAuth) Response() string { // Finds the first mechanism preferred by the client that the server supports. func pickSASLMechanism(client []Authentication, serverMechanisms []string) (auth Authentication, ok bool) { - for _, auth = range client { for _, mech := range serverMechanisms { if auth.Mechanism() == mech { diff --git a/vendor/github.com/rabbitmq/amqp091-go/channel.go b/vendor/github.com/rabbitmq/amqp091-go/channel.go index 3dfd7faf9..e6f9d014b 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/channel.go +++ b/vendor/github.com/rabbitmq/amqp091-go/channel.go @@ -38,8 +38,8 @@ type Channel struct { id uint16 - // closed is set to 1 when the channel has been closed - see Channel.send() - closed int32 + // closed is set to true when the channel has been closed - see Channel.send() + closed atomic.Bool close chan struct{} // true when we will never notify again @@ -92,7 +92,7 @@ func newChannel(c *Connection, id uint16) *Channel { // Signal that from now on, Channel.send() should call Channel.sendClosed() func (ch *Channel) setClosed() { - atomic.StoreInt32(&ch.closed, 1) + ch.closed.Store(true) } // shutdown is called by Connection after the channel has been removed from the @@ -307,7 +307,7 @@ func (ch *Channel) sendOpen(msg message) (err error) { func (ch *Channel) dispatch(msg message) { switch m := msg.(type) { case *channelClose: - // Note: channel state is set to closed immedately after the message is + // Note: channel state is set to closed immediately after the message is // decoded by the Connection // lock before sending connection.close-ok @@ -488,7 +488,7 @@ func (ch *Channel) Close() error { // IsClosed returns true if the channel is marked as closed, otherwise false // is returned. func (ch *Channel) IsClosed() bool { - return atomic.LoadInt32(&ch.closed) == 1 + return ch.closed.Load() } /* @@ -926,8 +926,7 @@ exchange and queue, the attempt to rebind will be ignored and the existing binding will be retained. In the case that multiple bindings may cause the message to be routed to the -same queue, the server will only route the publishing once. This is possible -with topic exchanges. +same queue, the server will route the publishing to all queues that match. QueueBind("pagers", "alert", "amq.topic", false, nil) QueueBind("emails", "info", "amq.topic", false, nil) @@ -936,6 +935,7 @@ with topic exchanges. Delivery Exchange Key Queue ----------------------------------------------- key: alert --> amq.topic ----> alert --> pagers + \---> # ------> emails key: info ---> amq.topic ----> # ------> emails \---> info ---/ key: debug --> amq.topic ----> # ------> emails @@ -1492,7 +1492,10 @@ func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg /* PublishWithContext sends a Publishing from the client to an exchange on the server. -NOTE: this function is equivalent to [Channel.Publish]. Context is not honoured. +If the context is already cancelled when PublishWithContext is called, it +returns the context error immediately without attempting to publish. Context +cancellation after the call has started does not interrupt an in-flight +Publish, as the underlying I/O is not context-aware. When you want a single message to be delivered to a single queue, you can publish to the default exchange with the routingKey of the queue name. This is @@ -1523,8 +1526,13 @@ confirmations start at 1. Exit when all publishings are confirmed. When Publish does not return an error and the channel is in confirm mode, the internal counter for DeliveryTags with the first confirmation starts at 1. */ -func (ch *Channel) PublishWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error { - return ch.Publish(exchange, key, mandatory, immediate, msg) +func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + return ch.Publish(exchange, key, mandatory, immediate, msg) + } } /* @@ -1583,11 +1591,18 @@ DeferredConfirmation, allowing the caller to wait on the publisher confirmation for this message. If the channel has not been put into confirm mode, the DeferredConfirmation will be nil. -NOTE: PublishWithDeferredConfirmWithContext is equivalent to its non-context variant. The context passed -to this function is not honoured. +If the context is already cancelled when PublishWithDeferredConfirmWithContext is called, it +returns the context error immediately without attempting to publish. Context +cancellation after the call has started does not interrupt an in-flight +PublishWithDeferredConfirm, as the underlying I/O is not context-aware. */ -func (ch *Channel) PublishWithDeferredConfirmWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) { - return ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg) +func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + return ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg) + } } /* @@ -1790,7 +1805,7 @@ it must be redelivered or dropped. See also Delivery.Nack */ -func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error { +func (ch *Channel) Nack(tag uint64, multiple, requeue bool) error { ch.m.Lock() defer ch.m.Unlock() diff --git a/vendor/github.com/rabbitmq/amqp091-go/connection.go b/vendor/github.com/rabbitmq/amqp091-go/connection.go index e167a23fc..3d483c973 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/connection.go +++ b/vendor/github.com/rabbitmq/amqp091-go/connection.go @@ -28,7 +28,7 @@ const ( defaultHeartbeat = 10 * time.Second defaultConnectionTimeout = 30 * time.Second defaultProduct = "AMQP 0.9.1 Client" - buildVersion = "1.10.0" + buildVersion = "1.11.0" platform = "golang" // Safer default that makes channel leaks a lot easier to spot // before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593. @@ -93,7 +93,8 @@ func NewConnectionProperties() Table { // multiplexed on this channel. There must always be active receivers for // every asynchronous message on this connection. type Connection struct { - destructor sync.Once // shutdown once + destructor sync.Once // teardown: notify listeners, close channels and the socket + closeOnce sync.Once // handshake: send one `connection.close` frame sendM sync.Mutex // conn writer mutex m sync.Mutex // struct field mutex @@ -122,7 +123,7 @@ type Connection struct { Properties Table // Server properties Locales []string // Server locales - closed int32 // Will be 1 if the connection is closed, 0 otherwise. Should only be accessed as atomic + closed atomic.Bool // Will be true if the connection is closed, false otherwise. } type readDeadliner interface { @@ -423,14 +424,23 @@ func (c *Connection) Close() error { return ErrClosed } - defer c.shutdown(nil) - return c.call( - &connectionClose{ - ReplyCode: replySuccess, - ReplyText: "kthxbai", - }, - &connectionCloseOk{}, - ) + var handshakeErr error + var initiated bool + c.closeOnce.Do(func() { + initiated = true + defer c.shutdown(nil) + handshakeErr = c.call( + &connectionClose{ + ReplyCode: replySuccess, + ReplyText: "kthxbai", + }, + &connectionCloseOk{}, + ) + }) + if !initiated { + return ErrClosed + } + return handshakeErr } // CloseDeadline requests and waits for the response to close this AMQP connection. @@ -451,20 +461,27 @@ func (c *Connection) CloseDeadline(deadline time.Time) error { return ErrClosed } - defer c.shutdown(nil) - - err := c.setDeadline(deadline) - if err != nil { - return err + var handshakeErr error + var initiated bool + c.closeOnce.Do(func() { + initiated = true + defer c.shutdown(nil) + if err := c.setDeadline(deadline); err != nil { + handshakeErr = err + return + } + handshakeErr = c.call( + &connectionClose{ + ReplyCode: replySuccess, + ReplyText: "kthxbai", + }, + &connectionCloseOk{}, + ) + }) + if !initiated { + return ErrClosed } - - return c.call( - &connectionClose{ - ReplyCode: replySuccess, - ReplyText: "kthxbai", - }, - &connectionCloseOk{}, - ) + return handshakeErr } func (c *Connection) closeWith(err *Error) error { @@ -472,21 +489,29 @@ func (c *Connection) closeWith(err *Error) error { return ErrClosed } - defer c.shutdown(err) - - return c.call( - &connectionClose{ - ReplyCode: uint16(err.Code), - ReplyText: err.Reason, - }, - &connectionCloseOk{}, - ) + var handshakeErr error + var initiated bool + c.closeOnce.Do(func() { + initiated = true + defer c.shutdown(err) + handshakeErr = c.call( + &connectionClose{ + ReplyCode: uint16(err.Code), + ReplyText: err.Reason, + }, + &connectionCloseOk{}, + ) + }) + if !initiated { + return ErrClosed + } + return handshakeErr } // IsClosed returns true if the connection is marked as closed, otherwise false // is returned. func (c *Connection) IsClosed() bool { - return atomic.LoadInt32(&c.closed) == 1 + return c.closed.Load() } // setDeadline is a wrapper to type assert Connection.conn and set an I/O @@ -598,7 +623,7 @@ func (c *Connection) flush() (err error) { } func (c *Connection) shutdown(err *Error) { - atomic.StoreInt32(&c.closed, 1) + c.closed.Store(true) c.destructor.Do(func() { c.m.Lock() @@ -675,7 +700,6 @@ func (c *Connection) dispatch0(f frame) { return case c.rpc <- m: } - } case *heartbeatFrame: // kthx - all reads reset our deadline. so we can drop this @@ -755,7 +779,6 @@ func (c *Connection) reader(r io.Reader) { for { frame, err := frames.ReadFrame() - if err != nil { c.shutdown(&Error{Code: FrameError, Reason: err.Error()}) return @@ -899,13 +922,17 @@ func (c *Connection) closeChannel(ch *Channel, e *Error) { Channel opens a unique, concurrent server channel to process the bulk of AMQP messages. Any error from methods on this receiver will render the receiver invalid and a new Channel should be opened. + +Channels are not thread-safe. To avoid unexpected behavior, do not share +a single Channel instance between multiple goroutines. Concurrent calls +to Channel methods may result in race conditions or unpredictable outcomes. */ func (c *Connection) Channel() (*Channel, error) { return c.openChannel() } func (c *Connection) call(req message, res ...message) error { - // Special case for when the protocol header frame is sent insted of a + // Special case for when the protocol header frame is sent instead of a // request method if req != nil { if err := c.send(&methodFrame{ChannelId: 0, Method: req}); err != nil { diff --git a/vendor/github.com/rabbitmq/amqp091-go/consumers.go b/vendor/github.com/rabbitmq/amqp091-go/consumers.go index c352fece9..395869303 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/consumers.go +++ b/vendor/github.com/rabbitmq/amqp091-go/consumers.go @@ -12,7 +12,7 @@ import ( "sync/atomic" ) -var consumerSeq uint64 +var consumerSeq atomic.Uint64 const consumerTagLengthMax = 0xFF // see writeShortstr @@ -23,7 +23,7 @@ func uniqueConsumerTag() string { func commandNameBasedUniqueConsumerTag(commandName string) string { tagPrefix := "ctag-" tagInfix := commandName - tagSuffix := "-" + strconv.FormatUint(atomic.AddUint64(&consumerSeq, 1), 10) + tagSuffix := "-" + strconv.FormatUint(consumerSeq.Add(1), 10) if len(tagPrefix)+len(tagInfix)+len(tagSuffix) > consumerTagLengthMax { tagInfix = "streadway/amqp" @@ -55,7 +55,7 @@ func (subs *consumers) buffer(in chan *Delivery, out chan Delivery) { defer close(out) defer subs.Done() - var inflight = in + inflight := in var queue []*Delivery for delivery := range in { diff --git a/vendor/github.com/rabbitmq/amqp091-go/delivery.go b/vendor/github.com/rabbitmq/amqp091-go/delivery.go index e94cf3437..40e7efdcf 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/delivery.go +++ b/vendor/github.com/rabbitmq/amqp091-go/delivery.go @@ -10,7 +10,7 @@ import ( "time" ) -var errDeliveryNotInitialized = errors.New("delivery not initialized") +var ErrDeliveryNotInitialized = errors.New("delivery not initialized. Channel is probably closed") // Acknowledger notifies the server of successful or failed consumption of // deliveries via identifier found in the Delivery.DeliveryTag field. @@ -18,7 +18,7 @@ var errDeliveryNotInitialized = errors.New("delivery not initialized") // Applications can provide mock implementations in tests of Delivery handlers. type Acknowledger interface { Ack(tag uint64, multiple bool) error - Nack(tag uint64, multiple bool, requeue bool) error + Nack(tag uint64, multiple, requeue bool) error Reject(tag uint64, requeue bool) error } @@ -122,7 +122,7 @@ delivery that is not automatically acknowledged. */ func (d Delivery) Ack(multiple bool) error { if d.Acknowledger == nil { - return errDeliveryNotInitialized + return ErrDeliveryNotInitialized } return d.Acknowledger.Ack(d.DeliveryTag, multiple) } @@ -142,7 +142,7 @@ delivery that is not automatically acknowledged. */ func (d Delivery) Reject(requeue bool) error { if d.Acknowledger == nil { - return errDeliveryNotInitialized + return ErrDeliveryNotInitialized } return d.Acknowledger.Reject(d.DeliveryTag, requeue) } @@ -167,7 +167,7 @@ delivery that is not automatically acknowledged. */ func (d Delivery) Nack(multiple, requeue bool) error { if d.Acknowledger == nil { - return errDeliveryNotInitialized + return ErrDeliveryNotInitialized } return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue) } diff --git a/vendor/github.com/rabbitmq/amqp091-go/log.go b/vendor/github.com/rabbitmq/amqp091-go/log.go index 7540f137a..539fbef28 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/log.go +++ b/vendor/github.com/rabbitmq/amqp091-go/log.go @@ -5,7 +5,7 @@ package amqp091 type Logging interface { - Printf(format string, v ...interface{}) + Printf(format string, v ...any) } var Logger Logging = NullLogger{} @@ -16,8 +16,7 @@ func SetLogger(logger Logging) { Logger = logger } -type NullLogger struct { -} +type NullLogger struct{} -func (l NullLogger) Printf(format string, v ...interface{}) { +func (l NullLogger) Printf(format string, v ...any) { } diff --git a/vendor/github.com/rabbitmq/amqp091-go/read.go b/vendor/github.com/rabbitmq/amqp091-go/read.go index a8bed1379..ac1f8eb9e 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/read.go +++ b/vendor/github.com/rabbitmq/amqp091-go/read.go @@ -140,7 +140,7 @@ func readTimestamp(r io.Reader) (v time.Time, err error) { } /* -'A': []interface{} +'A': any 'D': Decimal 'F': Table 'I': int32 @@ -152,11 +152,13 @@ func readTimestamp(r io.Reader) (v time.Time, err error) { 'd': float64 'f': float32 'l': int64 +'i': uint32 's': int16 +'u': uint16 't': bool 'x': []byte */ -func readField(r io.Reader) (v interface{}, err error) { +func readField(r io.Reader) (v any, err error) { var typ byte if err = binary.Read(r, binary.BigEndian, &typ); err != nil { return @@ -205,6 +207,20 @@ func readField(r io.Reader) (v interface{}, err error) { } return value, nil + case 'u': + var value uint16 + if err = binary.Read(r, binary.BigEndian, &value); err != nil { + return + } + return value, nil + + case 'i': + var value uint32 + if err = binary.Read(r, binary.BigEndian, &value); err != nil { + return + } + return value, nil + case 'f': var value float32 if err = binary.Read(r, binary.BigEndian, &value); err != nil { @@ -269,13 +285,13 @@ func readTable(r io.Reader) (table Table, err error) { return } - nested.Write([]byte(str)) + nested.WriteString(str) table = make(Table) for nested.Len() > 0 { var key string - var value interface{} + var value any if key, err = readShortstr(&nested); err != nil { return @@ -291,11 +307,8 @@ func readTable(r io.Reader) (table Table, err error) { return } -func readArray(r io.Reader) ([]interface{}, error) { - var ( - size uint32 - err error - ) +func readArray(r io.Reader) (arr []any, err error) { + var size uint32 if err = binary.Read(r, binary.BigEndian, &size); err != nil { return nil, err @@ -303,8 +316,7 @@ func readArray(r io.Reader) ([]interface{}, error) { var ( lim = &io.LimitedReader{R: r, N: int64(size)} - arr []interface{} - field interface{} + field any ) for { @@ -330,91 +342,101 @@ func (r *reader) parseHeaderFrame(channel uint16, size uint32) (frame frame, err ChannelId: channel, } - if err = binary.Read(r.r, binary.BigEndian, &hf.ClassId); err != nil { + lim := &io.LimitedReader{R: r.r, N: int64(size)} + + if err = binary.Read(lim, binary.BigEndian, &hf.ClassId); err != nil { return } - if err = binary.Read(r.r, binary.BigEndian, &hf.weight); err != nil { + if err = binary.Read(lim, binary.BigEndian, &hf.weight); err != nil { return } - if err = binary.Read(r.r, binary.BigEndian, &hf.Size); err != nil { + if err = binary.Read(lim, binary.BigEndian, &hf.Size); err != nil { return } var flags uint16 - if err = binary.Read(r.r, binary.BigEndian, &flags); err != nil { + if err = binary.Read(lim, binary.BigEndian, &flags); err != nil { return } if hasProperty(flags, flagContentType) { - if hf.Properties.ContentType, err = readShortstr(r.r); err != nil { + if hf.Properties.ContentType, err = readShortstr(lim); err != nil { return } } if hasProperty(flags, flagContentEncoding) { - if hf.Properties.ContentEncoding, err = readShortstr(r.r); err != nil { + if hf.Properties.ContentEncoding, err = readShortstr(lim); err != nil { return } } if hasProperty(flags, flagHeaders) { - if hf.Properties.Headers, err = readTable(r.r); err != nil { + if hf.Properties.Headers, err = readTable(lim); err != nil { return } } if hasProperty(flags, flagDeliveryMode) { - if err = binary.Read(r.r, binary.BigEndian, &hf.Properties.DeliveryMode); err != nil { + if err = binary.Read(lim, binary.BigEndian, &hf.Properties.DeliveryMode); err != nil { return } } if hasProperty(flags, flagPriority) { - if err = binary.Read(r.r, binary.BigEndian, &hf.Properties.Priority); err != nil { + if err = binary.Read(lim, binary.BigEndian, &hf.Properties.Priority); err != nil { return } } if hasProperty(flags, flagCorrelationId) { - if hf.Properties.CorrelationId, err = readShortstr(r.r); err != nil { + if hf.Properties.CorrelationId, err = readShortstr(lim); err != nil { return } } if hasProperty(flags, flagReplyTo) { - if hf.Properties.ReplyTo, err = readShortstr(r.r); err != nil { + if hf.Properties.ReplyTo, err = readShortstr(lim); err != nil { return } } if hasProperty(flags, flagExpiration) { - if hf.Properties.Expiration, err = readShortstr(r.r); err != nil { + if hf.Properties.Expiration, err = readShortstr(lim); err != nil { return } } if hasProperty(flags, flagMessageId) { - if hf.Properties.MessageId, err = readShortstr(r.r); err != nil { + if hf.Properties.MessageId, err = readShortstr(lim); err != nil { return } } if hasProperty(flags, flagTimestamp) { - if hf.Properties.Timestamp, err = readTimestamp(r.r); err != nil { + if hf.Properties.Timestamp, err = readTimestamp(lim); err != nil { return } } if hasProperty(flags, flagType) { - if hf.Properties.Type, err = readShortstr(r.r); err != nil { + if hf.Properties.Type, err = readShortstr(lim); err != nil { return } } if hasProperty(flags, flagUserId) { - if hf.Properties.UserId, err = readShortstr(r.r); err != nil { + if hf.Properties.UserId, err = readShortstr(lim); err != nil { return } } if hasProperty(flags, flagAppId) { - if hf.Properties.AppId, err = readShortstr(r.r); err != nil { + if hf.Properties.AppId, err = readShortstr(lim); err != nil { return } } if hasProperty(flags, flagReserved1) { - if hf.Properties.reserved1, err = readShortstr(r.r); err != nil { + if hf.Properties.reserved1, err = readShortstr(lim); err != nil { + return + } + } + + // Drain any bytes remaining in the frame payload that were not consumed + // by property parsing (e.g. padding added by other AMQP implementations). + if lim.N > 0 { + if _, err = io.CopyN(io.Discard, lim, lim.N); err != nil { return } } @@ -435,7 +457,7 @@ func (r *reader) parseBodyFrame(channel uint16, size uint32) (frame frame, err e return bf, nil } -var errHeartbeatPayload = errors.New("Heartbeats should not have a payload") +var errHeartbeatPayload = errors.New("heartbeats should not have a payload") func (r *reader) parseHeartbeatFrame(channel uint16, size uint32) (frame frame, err error) { hf := &heartbeatFrame{ diff --git a/vendor/github.com/rabbitmq/amqp091-go/return.go b/vendor/github.com/rabbitmq/amqp091-go/return.go index cdc3875ed..29bbeaf60 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/return.go +++ b/vendor/github.com/rabbitmq/amqp091-go/return.go @@ -25,7 +25,7 @@ type Return struct { DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2) Priority uint8 // queue implementation use - 0 to 9 CorrelationId string // application use - correlation identifier - ReplyTo string // application use - address to to reply to (ex: RPC) + ReplyTo string // application use - address to reply to (ex: RPC) Expiration string // implementation use - message expiration spec MessageId string // application use - message identifier Timestamp time.Time // application use - message timestamp diff --git a/vendor/github.com/rabbitmq/amqp091-go/spec091.go b/vendor/github.com/rabbitmq/amqp091-go/spec091.go index 6e02ba997..cc24cf41f 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/spec091.go +++ b/vendor/github.com/rabbitmq/amqp091-go/spec091.go @@ -84,7 +84,6 @@ func (msg *connectionStart) wait() bool { } func (msg *connectionStart) write(w io.Writer) (err error) { - if err = binary.Write(w, binary.BigEndian, msg.VersionMajor); err != nil { return } @@ -107,7 +106,6 @@ func (msg *connectionStart) write(w io.Writer) (err error) { } func (msg *connectionStart) read(r io.Reader) (err error) { - if err = binary.Read(r, binary.BigEndian, &msg.VersionMajor); err != nil { return } @@ -145,7 +143,6 @@ func (msg *connectionStartOk) wait() bool { } func (msg *connectionStartOk) write(w io.Writer) (err error) { - if err = writeTable(w, msg.ClientProperties); err != nil { return } @@ -166,7 +163,6 @@ func (msg *connectionStartOk) write(w io.Writer) (err error) { } func (msg *connectionStartOk) read(r io.Reader) (err error) { - if msg.ClientProperties, err = readTable(r); err != nil { return } @@ -199,7 +195,6 @@ func (msg *connectionSecure) wait() bool { } func (msg *connectionSecure) write(w io.Writer) (err error) { - if err = writeLongstr(w, msg.Challenge); err != nil { return } @@ -208,7 +203,6 @@ func (msg *connectionSecure) write(w io.Writer) (err error) { } func (msg *connectionSecure) read(r io.Reader) (err error) { - if msg.Challenge, err = readLongstr(r); err != nil { return } @@ -229,7 +223,6 @@ func (msg *connectionSecureOk) wait() bool { } func (msg *connectionSecureOk) write(w io.Writer) (err error) { - if err = writeLongstr(w, msg.Response); err != nil { return } @@ -238,7 +231,6 @@ func (msg *connectionSecureOk) write(w io.Writer) (err error) { } func (msg *connectionSecureOk) read(r io.Reader) (err error) { - if msg.Response, err = readLongstr(r); err != nil { return } @@ -261,7 +253,6 @@ func (msg *connectionTune) wait() bool { } func (msg *connectionTune) write(w io.Writer) (err error) { - if err = binary.Write(w, binary.BigEndian, msg.ChannelMax); err != nil { return } @@ -278,7 +269,6 @@ func (msg *connectionTune) write(w io.Writer) (err error) { } func (msg *connectionTune) read(r io.Reader) (err error) { - if err = binary.Read(r, binary.BigEndian, &msg.ChannelMax); err != nil { return } @@ -309,7 +299,6 @@ func (msg *connectionTuneOk) wait() bool { } func (msg *connectionTuneOk) write(w io.Writer) (err error) { - if err = binary.Write(w, binary.BigEndian, msg.ChannelMax); err != nil { return } @@ -326,7 +315,6 @@ func (msg *connectionTuneOk) write(w io.Writer) (err error) { } func (msg *connectionTuneOk) read(r io.Reader) (err error) { - if err = binary.Read(r, binary.BigEndian, &msg.ChannelMax); err != nil { return } @@ -408,7 +396,6 @@ func (msg *connectionOpenOk) wait() bool { } func (msg *connectionOpenOk) write(w io.Writer) (err error) { - if err = writeShortstr(w, msg.reserved1); err != nil { return } @@ -417,7 +404,6 @@ func (msg *connectionOpenOk) write(w io.Writer) (err error) { } func (msg *connectionOpenOk) read(r io.Reader) (err error) { - if msg.reserved1, err = readShortstr(r); err != nil { return } @@ -441,7 +427,6 @@ func (msg *connectionClose) wait() bool { } func (msg *connectionClose) write(w io.Writer) (err error) { - if err = binary.Write(w, binary.BigEndian, msg.ReplyCode); err != nil { return } @@ -461,7 +446,6 @@ func (msg *connectionClose) write(w io.Writer) (err error) { } func (msg *connectionClose) read(r io.Reader) (err error) { - if err = binary.Read(r, binary.BigEndian, &msg.ReplyCode); err != nil { return } @@ -480,8 +464,7 @@ func (msg *connectionClose) read(r io.Reader) (err error) { return } -type connectionCloseOk struct { -} +type connectionCloseOk struct{} func (msg *connectionCloseOk) id() (uint16, uint16) { return 10, 51 @@ -492,12 +475,10 @@ func (msg *connectionCloseOk) wait() bool { } func (msg *connectionCloseOk) write(w io.Writer) (err error) { - return } func (msg *connectionCloseOk) read(r io.Reader) (err error) { - return } @@ -514,7 +495,6 @@ func (msg *connectionBlocked) wait() bool { } func (msg *connectionBlocked) write(w io.Writer) (err error) { - if err = writeShortstr(w, msg.Reason); err != nil { return } @@ -523,7 +503,6 @@ func (msg *connectionBlocked) write(w io.Writer) (err error) { } func (msg *connectionBlocked) read(r io.Reader) (err error) { - if msg.Reason, err = readShortstr(r); err != nil { return } @@ -531,8 +510,7 @@ func (msg *connectionBlocked) read(r io.Reader) (err error) { return } -type connectionUnblocked struct { -} +type connectionUnblocked struct{} func (msg *connectionUnblocked) id() (uint16, uint16) { return 10, 61 @@ -543,12 +521,10 @@ func (msg *connectionUnblocked) wait() bool { } func (msg *connectionUnblocked) write(w io.Writer) (err error) { - return } func (msg *connectionUnblocked) read(r io.Reader) (err error) { - return } @@ -566,7 +542,6 @@ func (msg *connectionUpdateSecret) wait() bool { } func (msg *connectionUpdateSecret) write(w io.Writer) (err error) { - if err = writeLongstr(w, msg.NewSecret); err != nil { return } @@ -579,7 +554,6 @@ func (msg *connectionUpdateSecret) write(w io.Writer) (err error) { } func (msg *connectionUpdateSecret) read(r io.Reader) (err error) { - if msg.NewSecret, err = readLongstr(r); err != nil { return } @@ -591,8 +565,7 @@ func (msg *connectionUpdateSecret) read(r io.Reader) (err error) { return } -type connectionUpdateSecretOk struct { -} +type connectionUpdateSecretOk struct{} func (msg *connectionUpdateSecretOk) id() (uint16, uint16) { return 10, 71 @@ -603,12 +576,10 @@ func (msg *connectionUpdateSecretOk) wait() bool { } func (msg *connectionUpdateSecretOk) write(w io.Writer) (err error) { - return } func (msg *connectionUpdateSecretOk) read(r io.Reader) (err error) { - return } @@ -625,7 +596,6 @@ func (msg *channelOpen) wait() bool { } func (msg *channelOpen) write(w io.Writer) (err error) { - if err = writeShortstr(w, msg.reserved1); err != nil { return } @@ -634,7 +604,6 @@ func (msg *channelOpen) write(w io.Writer) (err error) { } func (msg *channelOpen) read(r io.Reader) (err error) { - if msg.reserved1, err = readShortstr(r); err != nil { return } @@ -655,7 +624,6 @@ func (msg *channelOpenOk) wait() bool { } func (msg *channelOpenOk) write(w io.Writer) (err error) { - if err = writeLongstr(w, msg.reserved1); err != nil { return } @@ -664,7 +632,6 @@ func (msg *channelOpenOk) write(w io.Writer) (err error) { } func (msg *channelOpenOk) read(r io.Reader) (err error) { - if msg.reserved1, err = readLongstr(r); err != nil { return } @@ -762,7 +729,6 @@ func (msg *channelClose) wait() bool { } func (msg *channelClose) write(w io.Writer) (err error) { - if err = binary.Write(w, binary.BigEndian, msg.ReplyCode); err != nil { return } @@ -782,7 +748,6 @@ func (msg *channelClose) write(w io.Writer) (err error) { } func (msg *channelClose) read(r io.Reader) (err error) { - if err = binary.Read(r, binary.BigEndian, &msg.ReplyCode); err != nil { return } @@ -801,8 +766,7 @@ func (msg *channelClose) read(r io.Reader) (err error) { return } -type channelCloseOk struct { -} +type channelCloseOk struct{} func (msg *channelCloseOk) id() (uint16, uint16) { return 20, 41 @@ -813,12 +777,10 @@ func (msg *channelCloseOk) wait() bool { } func (msg *channelCloseOk) write(w io.Writer) (err error) { - return } func (msg *channelCloseOk) read(r io.Reader) (err error) { - return } @@ -917,8 +879,7 @@ func (msg *exchangeDeclare) read(r io.Reader) (err error) { return } -type exchangeDeclareOk struct { -} +type exchangeDeclareOk struct{} func (msg *exchangeDeclareOk) id() (uint16, uint16) { return 40, 11 @@ -929,12 +890,10 @@ func (msg *exchangeDeclareOk) wait() bool { } func (msg *exchangeDeclareOk) write(w io.Writer) (err error) { - return } func (msg *exchangeDeclareOk) read(r io.Reader) (err error) { - return } @@ -999,8 +958,7 @@ func (msg *exchangeDelete) read(r io.Reader) (err error) { return } -type exchangeDeleteOk struct { -} +type exchangeDeleteOk struct{} func (msg *exchangeDeleteOk) id() (uint16, uint16) { return 40, 21 @@ -1011,12 +969,10 @@ func (msg *exchangeDeleteOk) wait() bool { } func (msg *exchangeDeleteOk) write(w io.Writer) (err error) { - return } func (msg *exchangeDeleteOk) read(r io.Reader) (err error) { - return } @@ -1098,8 +1054,7 @@ func (msg *exchangeBind) read(r io.Reader) (err error) { return } -type exchangeBindOk struct { -} +type exchangeBindOk struct{} func (msg *exchangeBindOk) id() (uint16, uint16) { return 40, 31 @@ -1110,12 +1065,10 @@ func (msg *exchangeBindOk) wait() bool { } func (msg *exchangeBindOk) write(w io.Writer) (err error) { - return } func (msg *exchangeBindOk) read(r io.Reader) (err error) { - return } @@ -1197,8 +1150,7 @@ func (msg *exchangeUnbind) read(r io.Reader) (err error) { return } -type exchangeUnbindOk struct { -} +type exchangeUnbindOk struct{} func (msg *exchangeUnbindOk) id() (uint16, uint16) { return 40, 51 @@ -1209,12 +1161,10 @@ func (msg *exchangeUnbindOk) wait() bool { } func (msg *exchangeUnbindOk) write(w io.Writer) (err error) { - return } func (msg *exchangeUnbindOk) read(r io.Reader) (err error) { - return } @@ -1321,7 +1271,6 @@ func (msg *queueDeclareOk) wait() bool { } func (msg *queueDeclareOk) write(w io.Writer) (err error) { - if err = writeShortstr(w, msg.Queue); err != nil { return } @@ -1337,7 +1286,6 @@ func (msg *queueDeclareOk) write(w io.Writer) (err error) { } func (msg *queueDeclareOk) read(r io.Reader) (err error) { - if msg.Queue, err = readShortstr(r); err != nil { return } @@ -1430,8 +1378,7 @@ func (msg *queueBind) read(r io.Reader) (err error) { return } -type queueBindOk struct { -} +type queueBindOk struct{} func (msg *queueBindOk) id() (uint16, uint16) { return 50, 21 @@ -1442,12 +1389,10 @@ func (msg *queueBindOk) wait() bool { } func (msg *queueBindOk) write(w io.Writer) (err error) { - return } func (msg *queueBindOk) read(r io.Reader) (err error) { - return } @@ -1468,7 +1413,6 @@ func (msg *queueUnbind) wait() bool { } func (msg *queueUnbind) write(w io.Writer) (err error) { - if err = binary.Write(w, binary.BigEndian, msg.reserved1); err != nil { return } @@ -1491,7 +1435,6 @@ func (msg *queueUnbind) write(w io.Writer) (err error) { } func (msg *queueUnbind) read(r io.Reader) (err error) { - if err = binary.Read(r, binary.BigEndian, &msg.reserved1); err != nil { return } @@ -1513,8 +1456,7 @@ func (msg *queueUnbind) read(r io.Reader) (err error) { return } -type queueUnbindOk struct { -} +type queueUnbindOk struct{} func (msg *queueUnbindOk) id() (uint16, uint16) { return 50, 51 @@ -1525,12 +1467,10 @@ func (msg *queueUnbindOk) wait() bool { } func (msg *queueUnbindOk) write(w io.Writer) (err error) { - return } func (msg *queueUnbindOk) read(r io.Reader) (err error) { - return } @@ -1602,7 +1542,6 @@ func (msg *queuePurgeOk) wait() bool { } func (msg *queuePurgeOk) write(w io.Writer) (err error) { - if err = binary.Write(w, binary.BigEndian, msg.MessageCount); err != nil { return } @@ -1611,7 +1550,6 @@ func (msg *queuePurgeOk) write(w io.Writer) (err error) { } func (msg *queuePurgeOk) read(r io.Reader) (err error) { - if err = binary.Read(r, binary.BigEndian, &msg.MessageCount); err != nil { return } @@ -1699,7 +1637,6 @@ func (msg *queueDeleteOk) wait() bool { } func (msg *queueDeleteOk) write(w io.Writer) (err error) { - if err = binary.Write(w, binary.BigEndian, msg.MessageCount); err != nil { return } @@ -1708,7 +1645,6 @@ func (msg *queueDeleteOk) write(w io.Writer) (err error) { } func (msg *queueDeleteOk) read(r io.Reader) (err error) { - if err = binary.Read(r, binary.BigEndian, &msg.MessageCount); err != nil { return } @@ -1771,8 +1707,7 @@ func (msg *basicQos) read(r io.Reader) (err error) { return } -type basicQosOk struct { -} +type basicQosOk struct{} func (msg *basicQosOk) id() (uint16, uint16) { return 60, 11 @@ -1783,12 +1718,10 @@ func (msg *basicQosOk) wait() bool { } func (msg *basicQosOk) write(w io.Writer) (err error) { - return } func (msg *basicQosOk) read(r io.Reader) (err error) { - return } @@ -1894,7 +1827,6 @@ func (msg *basicConsumeOk) wait() bool { } func (msg *basicConsumeOk) write(w io.Writer) (err error) { - if err = writeShortstr(w, msg.ConsumerTag); err != nil { return } @@ -1903,7 +1835,6 @@ func (msg *basicConsumeOk) write(w io.Writer) (err error) { } func (msg *basicConsumeOk) read(r io.Reader) (err error) { - if msg.ConsumerTag, err = readShortstr(r); err != nil { return } @@ -1970,7 +1901,6 @@ func (msg *basicCancelOk) wait() bool { } func (msg *basicCancelOk) write(w io.Writer) (err error) { - if err = writeShortstr(w, msg.ConsumerTag); err != nil { return } @@ -1979,7 +1909,6 @@ func (msg *basicCancelOk) write(w io.Writer) (err error) { } func (msg *basicCancelOk) read(r io.Reader) (err error) { - if msg.ConsumerTag, err = readShortstr(r); err != nil { return } @@ -2091,7 +2020,6 @@ func (msg *basicReturn) setContent(props properties, body []byte) { } func (msg *basicReturn) write(w io.Writer) (err error) { - if err = binary.Write(w, binary.BigEndian, msg.ReplyCode); err != nil { return } @@ -2110,7 +2038,6 @@ func (msg *basicReturn) write(w io.Writer) (err error) { } func (msg *basicReturn) read(r io.Reader) (err error) { - if err = binary.Read(r, binary.BigEndian, &msg.ReplyCode); err != nil { return } @@ -2358,7 +2285,6 @@ func (msg *basicGetEmpty) wait() bool { } func (msg *basicGetEmpty) write(w io.Writer) (err error) { - if err = writeShortstr(w, msg.reserved1); err != nil { return } @@ -2367,7 +2293,6 @@ func (msg *basicGetEmpty) write(w io.Writer) (err error) { } func (msg *basicGetEmpty) read(r io.Reader) (err error) { - if msg.reserved1, err = readShortstr(r); err != nil { return } @@ -2541,8 +2466,7 @@ func (msg *basicRecover) read(r io.Reader) (err error) { return } -type basicRecoverOk struct { -} +type basicRecoverOk struct{} func (msg *basicRecoverOk) id() (uint16, uint16) { return 60, 111 @@ -2553,12 +2477,10 @@ func (msg *basicRecoverOk) wait() bool { } func (msg *basicRecoverOk) write(w io.Writer) (err error) { - return } func (msg *basicRecoverOk) read(r io.Reader) (err error) { - return } @@ -2614,8 +2536,7 @@ func (msg *basicNack) read(r io.Reader) (err error) { return } -type txSelect struct { -} +type txSelect struct{} func (msg *txSelect) id() (uint16, uint16) { return 90, 10 @@ -2626,17 +2547,14 @@ func (msg *txSelect) wait() bool { } func (msg *txSelect) write(w io.Writer) (err error) { - return } func (msg *txSelect) read(r io.Reader) (err error) { - return } -type txSelectOk struct { -} +type txSelectOk struct{} func (msg *txSelectOk) id() (uint16, uint16) { return 90, 11 @@ -2647,17 +2565,14 @@ func (msg *txSelectOk) wait() bool { } func (msg *txSelectOk) write(w io.Writer) (err error) { - return } func (msg *txSelectOk) read(r io.Reader) (err error) { - return } -type txCommit struct { -} +type txCommit struct{} func (msg *txCommit) id() (uint16, uint16) { return 90, 20 @@ -2668,17 +2583,14 @@ func (msg *txCommit) wait() bool { } func (msg *txCommit) write(w io.Writer) (err error) { - return } func (msg *txCommit) read(r io.Reader) (err error) { - return } -type txCommitOk struct { -} +type txCommitOk struct{} func (msg *txCommitOk) id() (uint16, uint16) { return 90, 21 @@ -2689,17 +2601,14 @@ func (msg *txCommitOk) wait() bool { } func (msg *txCommitOk) write(w io.Writer) (err error) { - return } func (msg *txCommitOk) read(r io.Reader) (err error) { - return } -type txRollback struct { -} +type txRollback struct{} func (msg *txRollback) id() (uint16, uint16) { return 90, 30 @@ -2710,17 +2619,14 @@ func (msg *txRollback) wait() bool { } func (msg *txRollback) write(w io.Writer) (err error) { - return } func (msg *txRollback) read(r io.Reader) (err error) { - return } -type txRollbackOk struct { -} +type txRollbackOk struct{} func (msg *txRollbackOk) id() (uint16, uint16) { return 90, 31 @@ -2731,12 +2637,10 @@ func (msg *txRollbackOk) wait() bool { } func (msg *txRollbackOk) write(w io.Writer) (err error) { - return } func (msg *txRollbackOk) read(r io.Reader) (err error) { - return } @@ -2777,8 +2681,7 @@ func (msg *confirmSelect) read(r io.Reader) (err error) { return } -type confirmSelectOk struct { -} +type confirmSelectOk struct{} func (msg *confirmSelectOk) id() (uint16, uint16) { return 85, 11 @@ -2789,12 +2692,10 @@ func (msg *confirmSelectOk) wait() bool { } func (msg *confirmSelectOk) write(w io.Writer) (err error) { - return } func (msg *confirmSelectOk) read(r io.Reader) (err error) { - return } diff --git a/vendor/github.com/rabbitmq/amqp091-go/types.go b/vendor/github.com/rabbitmq/amqp091-go/types.go index 1e15ed0ab..1a2d10434 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/types.go +++ b/vendor/github.com/rabbitmq/amqp091-go/types.go @@ -23,6 +23,21 @@ const ( ExchangeHeaders = "headers" ) +// MIME types constants +const ( + MimeTextPlain = "text/plain" + MimeApplicationJSON = "application/json" + MimeApplicationOctetStream = "application/octet-stream" + MimeApplicationXML = "application/xml" + MimeTextXML = "text/xml" + MimeApplicationProtobuf = "application/protobuf" + MimeApplicationXProtobuf = "application/x-protobuf" + MimeApplicationMsgPack = "application/msgpack" + MimeApplicationAvro = "application/avro" + MimeApplicationCloudEventsJSON = "application/cloudevents+json" + MimeApplicationFormURLEncoded = "application/x-www-form-urlencoded" +) + var ( // ErrClosed is returned when the channel or connection is not open ErrClosed = &Error{Code: ChannelError, Reason: "channel/connection is not open"} @@ -70,6 +85,8 @@ var ( errInvalidTypeAssertion = &Error{Code: InternalError, Reason: "type assertion unsuccessful", Server: false, Recover: true} ) +var _ error = (*Error)(nil) + // Error captures the code and reason a channel or connection has been closed // by the server. type Error struct { @@ -88,10 +105,54 @@ func newError(code uint16, text string) *Error { } } -func (e Error) Error() string { +func (e *Error) Error() string { return fmt.Sprintf("Exception (%d) Reason: %q", e.Code, e.Reason) } +// Recoverable returns true if the error can be recovered by retrying later or with different parameters. +// Returns the value of the Recover field. +func (e *Error) Recoverable() bool { + return e.Recover +} + +// Temporary returns true if the error can be recovered by retrying later with the same parameters. +// +// The following are the codes which might be resolved by retry without external +// action, according to the AMQP 0.91 spec +// (https://www.rabbitmq.com/amqp-0-9-1-reference.html#constants). The quotations +// are from that page. +// +// ContentTooLarge (311) +// "The client attempted to transfer content larger than the server could +// accept at the present time. The client may retry at a later time." +// +// ConnectionForced (320) +// "An operator intervened to close the connection for some reason. The +// client may retry at some later date." +func (e *Error) Temporary() bool { + // amqp.Error has a Recover field which sounds like it should mean "retryable". + // But it actually means "can be recovered by retrying later or with different + // parameters," which is not what we want. The error codes for which Recover is + // true, defined in the isSoftExceptionCode function, including things + // like NotFound and AccessRefused, which require outside action. + switch e.Code { + case ContentTooLarge: + return true + + case ConnectionForced: + return true + + default: + return false + } +} + +// GoString returns a longer description of the error than .Error() including all fields. +func (e *Error) GoString() string { + return fmt.Sprintf("Exception=%d, Reason=%q, Recover=%v, Server=%v", + e.Code, e.Reason, e.Recover, e.Server) +} + // Used by header frames to capture routing and header information type properties struct { ContentType string // MIME content type @@ -100,7 +161,7 @@ type properties struct { DeliveryMode uint8 // queue implementation use - Transient (1) or Persistent (2) Priority uint8 // queue implementation use - 0 to 9 CorrelationId string // application use - correlation identifier - ReplyTo string // application use - address to to reply to (ex: RPC) + ReplyTo string // application use - address to reply to (ex: RPC) Expiration string // implementation use - message expiration spec MessageId string // application use - message identifier Timestamp time.Time // application use - message timestamp @@ -180,11 +241,11 @@ type Publishing struct { DeliveryMode uint8 // Transient (0 or 1) or Persistent (2) Priority uint8 // 0 to 9 CorrelationId string // correlation identifier - ReplyTo string // address to to reply to (ex: RPC) + ReplyTo string // address to reply to (ex: RPC) // Expiration represents the message TTL in milliseconds. A value of "0" // indicates that the message will immediately expire if the message arrives // at its destination and the message is not directly handled by a consumer - // that currently has the capacatity to do so. If you wish the message to + // that currently has the capacity to do so. If you wish the message to // not expire on its own, set this value to any ttl value, empty string or // use the corresponding constants NeverExpire and ImmediatelyExpire. This // does not influence queue configured TTL values. @@ -326,13 +387,15 @@ const ( // int16 // int32 // int64 +// uint16 +// uint32 // nil // string // time.Time // amqp.Decimal // amqp.Table // []byte -// []interface{} - containing above types +// []any - containing above types // // Functions taking a table will immediately fail when the table contains a // value of an unsupported type. @@ -343,14 +406,14 @@ const ( // Use a type assertion when reading values from a table for type conversion. // // RabbitMQ expects int32 for integer values. -type Table map[string]interface{} +type Table map[string]any -func validateField(f interface{}) error { +func validateField(f any) error { switch fv := f.(type) { - case nil, bool, byte, int8, int, int16, int32, int64, float32, float64, string, []byte, Decimal, time.Time: + case nil, bool, byte, int8, int, int16, int32, int64, uint16, uint32, float32, float64, string, []byte, Decimal, time.Time: return nil - case []interface{}: + case []any: for _, v := range fv { if err := validateField(v); err != nil { return fmt.Errorf("in array %s", err) diff --git a/vendor/github.com/rabbitmq/amqp091-go/uri.go b/vendor/github.com/rabbitmq/amqp091-go/uri.go index ddc4b1adb..d9ecf228d 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/uri.go +++ b/vendor/github.com/rabbitmq/amqp091-go/uri.go @@ -14,8 +14,10 @@ import ( "strings" ) -var errURIScheme = errors.New("AMQP scheme must be either 'amqp://' or 'amqps://'") -var errURIWhitespace = errors.New("URI must not contain whitespace") +var ( + errURIScheme = errors.New("AMQP scheme must be either 'amqp://' or 'amqps://'") + errURIWhitespace = errors.New("URI must not contain whitespace") +) var schemePorts = map[string]int{ "amqp": 5672, @@ -164,7 +166,7 @@ func ParseURI(uri string) (URI, error) { if params.Has("channel_max") { value, err := strconv.ParseUint(params.Get("channel_max"), 10, 16) if err != nil { - return builder, fmt.Errorf("connection_timeout is not an integer: %v", err) + return builder, fmt.Errorf("channel_max is not an uint16: %v", err) } builder.ChannelMax = uint16(value) } diff --git a/vendor/github.com/rabbitmq/amqp091-go/write.go b/vendor/github.com/rabbitmq/amqp091-go/write.go index dcec31448..168774e11 100644 --- a/vendor/github.com/rabbitmq/amqp091-go/write.go +++ b/vendor/github.com/rabbitmq/amqp091-go/write.go @@ -10,6 +10,7 @@ import ( "bytes" "encoding/binary" "errors" + "fmt" "io" "math" "time" @@ -90,44 +91,44 @@ func (f *headerFrame) write(w io.Writer) (err error) { var mask uint16 - if len(f.Properties.ContentType) > 0 { - mask = mask | flagContentType + if f.Properties.ContentType != "" { + mask |= flagContentType } - if len(f.Properties.ContentEncoding) > 0 { - mask = mask | flagContentEncoding + if f.Properties.ContentEncoding != "" { + mask |= flagContentEncoding } - if f.Properties.Headers != nil && len(f.Properties.Headers) > 0 { - mask = mask | flagHeaders + if len(f.Properties.Headers) > 0 { + mask |= flagHeaders } if f.Properties.DeliveryMode > 0 { - mask = mask | flagDeliveryMode + mask |= flagDeliveryMode } if f.Properties.Priority > 0 { - mask = mask | flagPriority + mask |= flagPriority } - if len(f.Properties.CorrelationId) > 0 { - mask = mask | flagCorrelationId + if f.Properties.CorrelationId != "" { + mask |= flagCorrelationId } - if len(f.Properties.ReplyTo) > 0 { - mask = mask | flagReplyTo + if f.Properties.ReplyTo != "" { + mask |= flagReplyTo } - if len(f.Properties.Expiration) > 0 { - mask = mask | flagExpiration + if f.Properties.Expiration != "" { + mask |= flagExpiration } - if len(f.Properties.MessageId) > 0 { - mask = mask | flagMessageId + if f.Properties.MessageId != "" { + mask |= flagMessageId } if !f.Properties.Timestamp.IsZero() { - mask = mask | flagTimestamp + mask |= flagTimestamp } - if len(f.Properties.Type) > 0 { - mask = mask | flagType + if f.Properties.Type != "" { + mask |= flagType } - if len(f.Properties.UserId) > 0 { - mask = mask | flagUserId + if f.Properties.UserId != "" { + mask |= flagUserId } - if len(f.Properties.AppId) > 0 { - mask = mask | flagAppId + if f.Properties.AppId != "" { + mask |= flagAppId } if err = binary.Write(&payload, binary.BigEndian, mask); err != nil { @@ -224,7 +225,6 @@ func writeFrame(w io.Writer, typ uint8, channel uint16, payload []byte) (err err byte((size & 0x0000ff00) >> 8), byte((size & 0x000000ff) >> 0), }) - if err != nil { return } @@ -243,7 +243,7 @@ func writeFrame(w io.Writer, typ uint8, channel uint16, payload []byte) (err err func writeShortstr(w io.Writer, s string) (err error) { b := []byte(s) - var length = uint8(len(b)) + length := uint8(len(b)) if err = binary.Write(w, binary.BigEndian, length); err != nil { return @@ -259,7 +259,7 @@ func writeShortstr(w io.Writer, s string) (err error) { func writeLongstr(w io.Writer, s string) (err error) { b := []byte(s) - var length = uint32(len(b)) + length := uint32(len(b)) if err = binary.Write(w, binary.BigEndian, length); err != nil { return @@ -273,7 +273,7 @@ func writeLongstr(w io.Writer, s string) (err error) { } /* -'A': []interface{} +'A': []any 'D': Decimal 'F': Table 'I': int32 @@ -285,11 +285,13 @@ func writeLongstr(w io.Writer, s string) (err error) { 'd': float64 'f': float32 'l': int64 +'i': uint32 's': int16 +'u': uint16 't': bool 'x': []byte */ -func writeField(w io.Writer, value interface{}) (err error) { +func writeField(w io.Writer, value any) (err error) { var buf [9]byte var enc []byte @@ -333,6 +335,16 @@ func writeField(w io.Writer, value interface{}) (err error) { binary.BigEndian.PutUint64(buf[1:9], uint64(v)) enc = buf[:9] + case uint16: + buf[0] = 'u' + binary.BigEndian.PutUint16(buf[1:3], v) + enc = buf[:3] + + case uint32: + buf[0] = 'i' + binary.BigEndian.PutUint32(buf[1:5], v) + enc = buf[:5] + case float32: buf[0] = 'f' binary.BigEndian.PutUint32(buf[1:5], math.Float32bits(v)) @@ -354,7 +366,7 @@ func writeField(w io.Writer, value interface{}) (err error) { binary.BigEndian.PutUint32(buf[1:5], uint32(len(v))) enc = append(buf[:5], []byte(v)...) - case []interface{}: // field-array + case []any: // field-array buf[0] = 'A' sec := new(bytes.Buffer) @@ -410,17 +422,23 @@ func writeField(w io.Writer, value interface{}) (err error) { return } +// writeTable serializes a Table to the given writer. +// It writes each key-value pair and returns the serialized data as a longstr. func writeTable(w io.Writer, table Table) (err error) { var buf bytes.Buffer for key, val := range table { if err = writeShortstr(&buf, key); err != nil { - return + return fmt.Errorf("writing key %q: %w", key, err) } if err = writeField(&buf, val); err != nil { - return + return fmt.Errorf("writing value for key %q: %w", key, err) } } - return writeLongstr(w, buf.String()) + if err := writeLongstr(w, buf.String()); err != nil { + return fmt.Errorf("writing final long string: %w", err) + } + + return nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index 6fe66254e..3cc0883f8 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -398,7 +398,7 @@ github.com/planetscale/vtprotobuf/protohelpers # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/rabbitmq/amqp091-go v1.10.0 +# github.com/rabbitmq/amqp091-go v1.11.0 ## explicit; go 1.20 github.com/rabbitmq/amqp091-go # github.com/rivo/uniseg v0.4.7