From a9d0cb11d73d3df95b4a7abba6702ec8a824e3da Mon Sep 17 00:00:00 2001 From: nugaon Date: Mon, 8 Jun 2026 13:39:39 +0200 Subject: [PATCH 1/7] feat: add SOC fields headers and wrapped chunk caching Add `Swarm-Soc-Fields` header to allow clients to request specific SOC fields (address, recoveredpubkey, identifier, signature, wrappedaddress, span, payload) in GSOC WebSocket messages. Add `Swarm-Cache-Wrapped-Chunk` header to enable caching of wrapped chunks on the node. Update GSOC handler to pass full SOC object instead of just payload, enabling access to all chunk properties. Adjust WebSocket buffer sizes to accommodate maximum SOC fields message size. --- pkg/api/api.go | 3 + pkg/api/gsoc.go | 149 +++++++++++++++++++++++++++++++++++++++++++---- pkg/gsoc/gsoc.go | 7 ++- pkg/soc/soc.go | 4 ++ 4 files changed, 150 insertions(+), 13 deletions(-) diff --git a/pkg/api/api.go b/pkg/api/api.go index acd838a3ff6..729f5a0812d 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -93,6 +93,8 @@ const ( SwarmActTimestampHeader = "Swarm-Act-Timestamp" SwarmActPublisherHeader = "Swarm-Act-Publisher" SwarmActHistoryAddressHeader = "Swarm-Act-History-Address" + SwarmSocFieldsHeader = "Swarm-Soc-Fields" + SwarmCacheWrappedChunkHeader = "Swarm-Cache-Wrapped-Chunk" ImmutableHeader = "Immutable" GasPriceHeader = "Gas-Price" @@ -583,6 +585,7 @@ func (s *Service) corsHandler(h http.Handler) http.Handler { SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader, SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader, SwarmOnlyRootChunk, GasPriceHeader, GasLimitHeader, ImmutableHeader, SwarmActHeader, SwarmActTimestampHeader, SwarmActPublisherHeader, SwarmActHistoryAddressHeader, + SwarmSocFieldsHeader, SwarmCacheWrappedChunkHeader, } allowedHeadersStr := strings.Join(allowedHeaders, ", ") diff --git a/pkg/api/gsoc.go b/pkg/api/gsoc.go index 60d048ffdc0..f879603a71f 100644 --- a/pkg/api/gsoc.go +++ b/pkg/api/gsoc.go @@ -5,15 +5,108 @@ package api import ( + "bytes" + "context" + "fmt" "net/http" + "slices" + "strings" "time" "github.com/ethersphere/bee/v2/pkg/jsonhttp" + "github.com/ethersphere/bee/v2/pkg/soc" "github.com/ethersphere/bee/v2/pkg/swarm" "github.com/gorilla/mux" "github.com/gorilla/websocket" ) +// SOC field identifiers that can be requested through the SwarmSocFieldsHeader +// to be serialized and channeled on every incoming GSOC chunk. +const ( + socFieldAddress = "address" + socFieldRecoveredPubKey = "recoveredpubkey" + socFieldIdentifier = "identifier" + socFieldSignature = "signature" + socFieldWrappedAddress = "wrappedaddress" + socFieldSpan = "span" + socFieldPayload = "payload" +) + +var validSocFields = []string{ + socFieldAddress, + socFieldRecoveredPubKey, + socFieldIdentifier, + socFieldSignature, + socFieldWrappedAddress, + socFieldSpan, + socFieldPayload, +} + +// maxSocFieldsSize is the maximum size of a serialized SOC fields message when +// every field is requested: the whole single owner chunk (identifier + +// signature + span + payload, i.e. SocMaxChunkSize) plus the derived metadata +// fields that are not part of the chunk on the wire (soc address, recovered +// public key and wrapped chunk address). +const maxSocFieldsSize = swarm.SocMaxChunkSize + + swarm.HashSize + // soc address + soc.OwnerPubKeySize + // recovered public key + swarm.HashSize // wrapped chunk address + +// parseSocFields parses the SwarmSocFieldsHeader value into a list of SOC field +// identifiers. When the header is empty it defaults to the payload field only, +// which preserves backward compatibility. +func parseSocFields(header string) ([]string, error) { + if strings.TrimSpace(header) == "" { + return []string{socFieldPayload}, nil + } + + parts := strings.Split(header, ",") + fields := make([]string, 0, len(parts)) + for _, p := range parts { + f := strings.ToLower(strings.TrimSpace(p)) + if f == "" { + continue + } + if !slices.Contains(validSocFields, f) { + return nil, fmt.Errorf("unknown soc field: %q", p) + } + fields = append(fields, f) + } + if len(fields) == 0 { + return []string{socFieldPayload}, nil + } + return fields, nil +} + +// socFieldsBytes serializes the requested SOC fields in the same order as they +// were provided in the header. +func socFieldsBytes(c *soc.SOC, fields []string) ([]byte, error) { + buf := bytes.NewBuffer(nil) + for _, f := range fields { + switch f { + case socFieldAddress: + addr, err := c.Address() + if err != nil { + return nil, fmt.Errorf("soc address: %w", err) + } + buf.Write(addr.Bytes()) + case socFieldRecoveredPubKey: + buf.Write(c.OwnerPubKey()) + case socFieldIdentifier: + buf.Write(c.ID()) + case socFieldSignature: + buf.Write(c.Signature()) + case socFieldWrappedAddress: + buf.Write(c.WrappedChunk().Address().Bytes()) + case socFieldSpan: + buf.Write(c.WrappedChunk().Data()[:swarm.SpanSize]) + case socFieldPayload: + buf.Write(c.WrappedChunk().Data()[swarm.SpanSize:]) + } + } + return buf.Bytes(), nil +} + func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) { logger := s.logger.WithName("gsoc_subscribe").Build() @@ -26,9 +119,31 @@ func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) { return } + headers := struct { + SocFields string `map:"Swarm-Soc-Fields"` + CacheWrappedChunk bool `map:"Swarm-Cache-Wrapped-Chunk"` + }{} + if response := s.mapStructure(r.Header, &headers); response != nil { + response("invalid header params", logger, w) + return + } + + fields, err := parseSocFields(headers.SocFields) + if err != nil { + logger.Debug("invalid soc fields header", "error", err) + logger.Error(nil, "invalid soc fields header") + jsonhttp.BadRequest(w, "invalid soc fields header") + return + } + upgrader := websocket.Upgrader{ - ReadBufferSize: swarm.ChunkSize, - WriteBufferSize: swarm.ChunkSize, + ReadBufferSize: swarm.SocMaxChunkSize, + // WriteBufferSize is only an I/O buffer hint; it does not cap the + // message size. The serialized output can be the whole single owner + // chunk plus the derived metadata fields (soc address, recovered public + // key, wrapped chunk address), so size it to that maximum to avoid split + // writes. + WriteBufferSize: maxSocFieldsSize, CheckOrigin: s.checkOrigin, } @@ -41,25 +156,39 @@ func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) { } s.wsWg.Add(1) - go s.gsocListeningWs(conn, paths.Address) + go s.gsocListeningWs(conn, paths.Address, fields, headers.CacheWrappedChunk) } -func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress swarm.Address) { +func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress swarm.Address, fields []string, cacheWrappedChunk bool) { defer s.wsWg.Done() var ( - dataC = make(chan []byte) - gone = make(chan struct{}) - ticker = time.NewTicker(s.WsPingPeriod) - err error + dataC = make(chan []byte) + gone = make(chan struct{}) + ticker = time.NewTicker(s.WsPingPeriod) + ctx, cancel = context.WithCancel(context.Background()) // for storing cached chunks + err error ) defer func() { + cancel() ticker.Stop() _ = conn.Close() }() - cleanup := s.gsoc.Subscribe(socAddress, func(m []byte) { + cleanup := s.gsoc.Subscribe(socAddress, func(c *soc.SOC) { + if cacheWrappedChunk { + if err := s.storer.Cache().Put(ctx, c.WrappedChunk()); err != nil { + s.logger.Debug("gsoc ws: cache wrapped chunk failed", "error", err) + } + } + + b, err := socFieldsBytes(c, fields) + if err != nil { + s.logger.Debug("gsoc ws: serialize soc fields failed", "error", err) + return + } + select { - case dataC <- m: + case dataC <- b: case <-gone: return case <-s.quit: diff --git a/pkg/gsoc/gsoc.go b/pkg/gsoc/gsoc.go index 41e4f54ac2c..343bf24aeaf 100644 --- a/pkg/gsoc/gsoc.go +++ b/pkg/gsoc/gsoc.go @@ -13,8 +13,9 @@ import ( ) // Handler defines code to be executed upon reception of a GSOC sub message. -// it is used as a parameter definition. -type Handler func([]byte) +// it is used as a parameter definition. It receives the recovered single owner +// chunk so the consumer has access to all of its properties. +type Handler func(*soc.SOC) type Listener interface { Subscribe(address swarm.Address, handler Handler) (cleanup func()) @@ -73,7 +74,7 @@ func (l *listener) Handle(c *soc.SOC) { for _, hh := range h { go func(hh Handler) { - hh(c.WrappedChunk().Data()[swarm.SpanSize:]) + hh(c) }(*hh) } } diff --git a/pkg/soc/soc.go b/pkg/soc/soc.go index 28ade83eeb8..09a6cd81c2e 100644 --- a/pkg/soc/soc.go +++ b/pkg/soc/soc.go @@ -20,6 +20,10 @@ var ( errWrongChunkSize = errors.New("soc: chunk length is less than minimum") ) +// OwnerPubKeySize is the byte length of a compressed secp256k1 public key, +// as returned by crypto.EncodeSecp256k1PublicKey. +const OwnerPubKeySize = 33 + // ID is a SOC identifier type ID []byte From 73be3f6dec0b71c22c5ff7508658d6b7b1962e1b Mon Sep 17 00:00:00 2001 From: nugaon Date: Mon, 8 Jun 2026 13:41:44 +0200 Subject: [PATCH 2/7] test: add SOC fields and wrapped chunk caching Add tests for `Swarm-Soc-Fields` header to verify requesting specific SOC fields (identifier, wrappedAddress, payload) and full wrapped chunk data (span + payload). Add test for `Swarm-Cache-Wrapped-Chunk` header to verify wrapped chunks are cached and retrievable. Update test helpers to support custom headers and return storer instance. Update gsoc handler signature to accept full SOC object instead of payload bytes. --- pkg/api/gsoc_test.go | 121 +++++++++++++++++++++++++++++++++++++++++- pkg/gsoc/gsoc_test.go | 6 +-- 2 files changed, 123 insertions(+), 4 deletions(-) diff --git a/pkg/api/gsoc_test.go b/pkg/api/gsoc_test.go index cf3161a9f03..6ab7e6daf9c 100644 --- a/pkg/api/gsoc_test.go +++ b/pkg/api/gsoc_test.go @@ -5,13 +5,17 @@ package api_test import ( + "bytes" + "context" "encoding/hex" "fmt" + "net/http" "net/url" "strings" "testing" "time" + "github.com/ethersphere/bee/v2/pkg/api" "github.com/ethersphere/bee/v2/pkg/cac" "github.com/ethersphere/bee/v2/pkg/crypto" "github.com/ethersphere/bee/v2/pkg/gsoc" @@ -134,7 +138,121 @@ func TestGsocPong(t *testing.T) { } } +// TestGsocWebsocketWrappedChunkData verifies that the Swarm-Soc-Fields header +// allows requesting the whole wrapped chunk data (span + payload). +func TestGsocWebsocketWrappedChunkData(t *testing.T) { + t.Parallel() + + var ( + id = make([]byte, 32) + headers = http.Header{api.SwarmSocFieldsHeader: []string{"span,payload"}} + g, cl, signer, _, _ = newGsocTestWithOpts(t, id, 0, headers) + respC = make(chan error, 1) + payload = []byte("The most dangerous phrase in the language is: ‘We've always done it this way.’") + ) + + err := cl.SetReadDeadline(time.Now().Add(longTimeout)) + if err != nil { + t.Fatal(err) + } + cl.SetReadLimit(swarm.ChunkSize) + + ch, _ := cac.New(payload) + socCh := soc.New(id, ch) + signedCh, _ := socCh.Sign(signer) + socCh, _ = soc.FromChunk(signedCh) + g.Handle(socCh) + + // span (8 bytes) + payload == full wrapped chunk data + go expectMessage(t, cl, respC, ch.Data()) + if err := <-respC; err != nil { + t.Fatal(err) + } +} + +// TestGsocWebsocketSocFields verifies that multiple SOC fields are serialized in +// the order they are provided in the Swarm-Soc-Fields header. +func TestGsocWebsocketSocFields(t *testing.T) { + t.Parallel() + + var ( + id = make([]byte, 32) + headers = http.Header{api.SwarmSocFieldsHeader: []string{"identifier,wrappedAddress,payload"}} + g, cl, signer, _, _ = newGsocTestWithOpts(t, id, 0, headers) + respC = make(chan error, 1) + payload = []byte("The future is already here — it's just not evenly distributed.") + ) + + err := cl.SetReadDeadline(time.Now().Add(longTimeout)) + if err != nil { + t.Fatal(err) + } + cl.SetReadLimit(swarm.ChunkSize) + + ch, _ := cac.New(payload) + socCh := soc.New(id, ch) + signedCh, _ := socCh.Sign(signer) + socCh, _ = soc.FromChunk(signedCh) + g.Handle(socCh) + + expected := make([]byte, 0) + expected = append(expected, id...) + expected = append(expected, ch.Address().Bytes()...) + expected = append(expected, payload...) + + go expectMessage(t, cl, respC, expected) + if err := <-respC; err != nil { + t.Fatal(err) + } +} + +// TestGsocWebsocketCacheWrappedChunk verifies that the Swarm-Cache-Wrapped-Chunk +// header causes the wrapped chunk to be stored in the cache so that it can be +// resolved through the bytes endpoint. +func TestGsocWebsocketCacheWrappedChunk(t *testing.T) { + t.Parallel() + + var ( + id = make([]byte, 32) + headers = http.Header{api.SwarmCacheWrappedChunkHeader: []string{"true"}} + g, cl, signer, _, storer = newGsocTestWithOpts(t, id, 0, headers) + respC = make(chan error, 1) + payload = []byte("If you don't like change, you're going to like irrelevance even less.") + ) + + err := cl.SetReadDeadline(time.Now().Add(longTimeout)) + if err != nil { + t.Fatal(err) + } + cl.SetReadLimit(swarm.ChunkSize) + + ch, _ := cac.New(payload) + socCh := soc.New(id, ch) + signedCh, _ := socCh.Sign(signer) + socCh, _ = soc.FromChunk(signedCh) + g.Handle(socCh) + + go expectMessage(t, cl, respC, payload) + if err := <-respC; err != nil { + t.Fatal(err) + } + + got, err := storer.ChunkStore().Get(context.Background(), ch.Address()) + if err != nil { + t.Fatalf("wrapped chunk not cached: %v", err) + } + if !bytes.Equal(got.Data(), ch.Data()) { + t.Fatal("cached wrapped chunk data mismatch") + } +} + func newGsocTest(t *testing.T, socId []byte, pingPeriod time.Duration) (gsoc.Listener, *websocket.Conn, crypto.Signer, string) { + t.Helper() + g, cl, signer, listener, _ := newGsocTestWithOpts(t, socId, pingPeriod, nil) + return g, cl, signer, listener +} + +func newGsocTestWithOpts(t *testing.T, socId []byte, pingPeriod time.Duration, headers http.Header) (gsoc.Listener, *websocket.Conn, crypto.Signer, string, api.Storer) { t.Helper() if pingPeriod == 0 { pingPeriod = 10 * time.Second @@ -161,11 +279,12 @@ func newGsocTest(t *testing.T, socId []byte, pingPeriod time.Duration) (gsoc.Lis _, cl, listener, _ := newTestServer(t, testServerOptions{ Gsoc: gsoc, WsPath: fmt.Sprintf("/gsoc/subscribe/%s", hex.EncodeToString(chunkAddr.Bytes())), + WsHeaders: headers, Storer: storer, BatchStore: batchStore, Logger: log.Noop, WsPingPeriod: pingPeriod, }) - return gsoc, cl, signer, listener + return gsoc, cl, signer, listener, storer } diff --git a/pkg/gsoc/gsoc_test.go b/pkg/gsoc/gsoc_test.go index dc49b0809a8..9beb892da51 100644 --- a/pkg/gsoc/gsoc_test.go +++ b/pkg/gsoc/gsoc_test.go @@ -37,17 +37,17 @@ func TestRegister(t *testing.T) { address1, _ = soc.CreateAddress(socId1, owner.Bytes()) address2, _ = soc.CreateAddress(socId2, owner.Bytes()) - h1 = func(m []byte) { + h1 = func(*soc.SOC) { h1Calls++ msgChan <- struct{}{} } - h2 = func(m []byte) { + h2 = func(*soc.SOC) { h2Calls++ msgChan <- struct{}{} } - h3 = func(m []byte) { + h3 = func(*soc.SOC) { h3Calls++ msgChan <- struct{}{} } From 4a0c65e38a5eba346360a07459368a35b4750bfa Mon Sep 17 00:00:00 2001 From: nugaon Date: Mon, 8 Jun 2026 13:44:33 +0200 Subject: [PATCH 3/7] docs: openapi with minor version bump --- openapi/Swarm.yaml | 12 ++++++++++-- openapi/SwarmCommon.yaml | 23 +++++++++++++++++++++++ 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/openapi/Swarm.yaml b/openapi/Swarm.yaml index 9a0eafc96f1..0db1a05b9e3 100644 --- a/openapi/Swarm.yaml +++ b/openapi/Swarm.yaml @@ -1,7 +1,7 @@ openapi: 3.0.3 info: - version: 8.1.0 + version: 8.2.0 title: Bee API description: "API endpoints for interacting with the Swarm network, supporting file operations, messaging, and node management" @@ -859,9 +859,17 @@ paths: $ref: "SwarmCommon.yaml#/components/schemas/SwarmAddress" required: true description: "Single Owner Chunk address (which may have multiple payloads)" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmSocFieldsParameter" + - $ref: "SwarmCommon.yaml#/components/parameters/SwarmCacheWrappedChunkParameter" responses: "200": - description: Establishes a WebSocket subscription for incoming messages on the Single Owner Chunk address + description: > + Establishes a WebSocket subscription for incoming messages on the + Single Owner Chunk address. Each message is the binary serialization + of the Single Owner Chunk fields requested through the + swarm-soc-fields header (defaults to the wrapped chunk payload). + "400": + $ref: "SwarmCommon.yaml#/components/responses/400" "500": $ref: "SwarmCommon.yaml#/components/responses/500" default: diff --git a/openapi/SwarmCommon.yaml b/openapi/SwarmCommon.yaml index ffcbbac3b8e..7ef301980f5 100644 --- a/openapi/SwarmCommon.yaml +++ b/openapi/SwarmCommon.yaml @@ -1144,6 +1144,29 @@ components: required: false description: Associate upload with an existing Tag UID + SwarmSocFieldsParameter: + in: header + name: swarm-soc-fields + schema: + type: string + required: false + description: > + Comma separated list of Single Owner Chunk fields to be serialized and + channeled on every incoming GSOC message, in the given order. Allowed + values are: address, recoveredPubKey, identifier, signature, + wrappedAddress, span, payload. When omitted it defaults to "payload". + + SwarmCacheWrappedChunkParameter: + in: header + name: swarm-cache-wrapped-chunk + schema: + type: boolean + required: false + description: > + Indicates whether the wrapped chunk of every incoming GSOC message should + be cached locally so that it can be resolved through the bytes endpoint + (useful when the single owner chunk wraps a root chunk larger than 4KB). + SwarmPinParameter: in: header name: swarm-pin From 48dcc0b473d6577e6ce7f0d2c2297f6c9fd68409 Mon Sep 17 00:00:00 2001 From: nugaon Date: Tue, 9 Jun 2026 10:06:27 +0200 Subject: [PATCH 4/7] fix: linting --- pkg/api/gsoc_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/api/gsoc_test.go b/pkg/api/gsoc_test.go index 6ab7e6daf9c..f20e6fe782d 100644 --- a/pkg/api/gsoc_test.go +++ b/pkg/api/gsoc_test.go @@ -195,7 +195,7 @@ func TestGsocWebsocketSocFields(t *testing.T) { socCh, _ = soc.FromChunk(signedCh) g.Handle(socCh) - expected := make([]byte, 0) + expected := make([]byte, 0, len(id)+swarm.HashSize+len(payload)) expected = append(expected, id...) expected = append(expected, ch.Address().Bytes()...) expected = append(expected, payload...) From 03e859e70824fb85a90d7b0b9586cd90b56196c1 Mon Sep 17 00:00:00 2001 From: nugaon Date: Tue, 9 Jun 2026 14:17:44 +0200 Subject: [PATCH 5/7] refactor: warning log instead of debug --- pkg/api/gsoc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/api/gsoc.go b/pkg/api/gsoc.go index f879603a71f..e3da611850f 100644 --- a/pkg/api/gsoc.go +++ b/pkg/api/gsoc.go @@ -183,7 +183,7 @@ func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress swarm.Address b, err := socFieldsBytes(c, fields) if err != nil { - s.logger.Debug("gsoc ws: serialize soc fields failed", "error", err) + s.logger.Warning("gsoc ws: serialize soc fields failed", "error", err) return } From 46186ef1c6f94f1b7179dd4bee52371c5c193d6b Mon Sep 17 00:00:00 2001 From: nugaon Date: Tue, 9 Jun 2026 16:59:03 +0200 Subject: [PATCH 6/7] refactor: increase msg buffer to 2 and slow connection handling --- pkg/api/gsoc.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/api/gsoc.go b/pkg/api/gsoc.go index e3da611850f..047ca5bb5ef 100644 --- a/pkg/api/gsoc.go +++ b/pkg/api/gsoc.go @@ -163,7 +163,7 @@ func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress swarm.Address defer s.wsWg.Done() var ( - dataC = make(chan []byte) + dataC = make(chan []byte, 2) // small buffer to decouple producer/consumer gone = make(chan struct{}) ticker = time.NewTicker(s.WsPingPeriod) ctx, cancel = context.WithCancel(context.Background()) // for storing cached chunks @@ -193,6 +193,13 @@ func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress swarm.Address return case <-s.quit: return + default: + s.logger.Warning("gsoc ws: slow consumer, closing connection") + _ = conn.WriteControl(websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "slow consumer"), + time.Now().Add(writeDeadline)) + _ = conn.Close() + return } }) From 216db9b5526ded9f7d3ffbb2004dc15262f47880 Mon Sep 17 00:00:00 2001 From: nugaon Date: Wed, 10 Jun 2026 11:03:20 +0200 Subject: [PATCH 7/7] docs: openapi random access desctiption --- openapi/SwarmCommon.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/openapi/SwarmCommon.yaml b/openapi/SwarmCommon.yaml index 7ef301980f5..7803e1ff646 100644 --- a/openapi/SwarmCommon.yaml +++ b/openapi/SwarmCommon.yaml @@ -1155,6 +1155,8 @@ components: channeled on every incoming GSOC message, in the given order. Allowed values are: address, recoveredPubKey, identifier, signature, wrappedAddress, span, payload. When omitted it defaults to "payload". + In order to have random access on the response bytes define payload + as the last field in the list since it has variable length. SwarmCacheWrappedChunkParameter: in: header