Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
openapi: 3.0.3

info:
version: 8.1.0
version: 8.2.0
title: Bee API
description: "API endpoints for interacting with the Swarm network, supporting file operations, messaging, and node management"

Expand Down Expand Up @@ -859,9 +859,17 @@ paths:
$ref: "SwarmCommon.yaml#/components/schemas/SwarmAddress"
required: true
description: "Single Owner Chunk address (which may have multiple payloads)"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmSocFieldsParameter"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmCacheWrappedChunkParameter"
responses:
"200":
description: Establishes a WebSocket subscription for incoming messages on the Single Owner Chunk address
description: >
Establishes a WebSocket subscription for incoming messages on the
Single Owner Chunk address. Each message is the binary serialization
of the Single Owner Chunk fields requested through the
swarm-soc-fields header (defaults to the wrapped chunk payload).
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
default:
Expand Down
23 changes: 23 additions & 0 deletions openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,29 @@ components:
required: false
description: Associate upload with an existing Tag UID

SwarmSocFieldsParameter:
in: header
name: swarm-soc-fields
schema:
type: string
required: false
description: >
Comma separated list of Single Owner Chunk fields to be serialized and
channeled on every incoming GSOC message, in the given order. Allowed
values are: address, recoveredPubKey, identifier, signature,
wrappedAddress, span, payload. When omitted it defaults to "payload".

SwarmCacheWrappedChunkParameter:
in: header
name: swarm-cache-wrapped-chunk
schema:
type: boolean
required: false
description: >
Indicates whether the wrapped chunk of every incoming GSOC message should
be cached locally so that it can be resolved through the bytes endpoint
(useful when the single owner chunk wraps a root chunk larger than 4KB).

SwarmPinParameter:
in: header
name: swarm-pin
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ const (
SwarmActTimestampHeader = "Swarm-Act-Timestamp"
SwarmActPublisherHeader = "Swarm-Act-Publisher"
SwarmActHistoryAddressHeader = "Swarm-Act-History-Address"
SwarmSocFieldsHeader = "Swarm-Soc-Fields"
SwarmCacheWrappedChunkHeader = "Swarm-Cache-Wrapped-Chunk"

ImmutableHeader = "Immutable"
GasPriceHeader = "Gas-Price"
Expand Down Expand Up @@ -583,6 +585,7 @@ func (s *Service) corsHandler(h http.Handler) http.Handler {
SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader,
SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader, SwarmOnlyRootChunk, GasPriceHeader, GasLimitHeader, ImmutableHeader,
SwarmActHeader, SwarmActTimestampHeader, SwarmActPublisherHeader, SwarmActHistoryAddressHeader,
SwarmSocFieldsHeader, SwarmCacheWrappedChunkHeader,
}
allowedHeadersStr := strings.Join(allowedHeaders, ", ")

Expand Down
149 changes: 139 additions & 10 deletions pkg/api/gsoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,108 @@
package api

import (
"bytes"
"context"
"fmt"
"net/http"
"slices"
"strings"
"time"

"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/soc"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
)

// SOC field identifiers that can be requested through the SwarmSocFieldsHeader
// to be serialized and channeled on every incoming GSOC chunk.
const (
socFieldAddress = "address"
socFieldRecoveredPubKey = "recoveredpubkey"
socFieldIdentifier = "identifier"
socFieldSignature = "signature"
socFieldWrappedAddress = "wrappedaddress"
socFieldSpan = "span"
socFieldPayload = "payload"
)

var validSocFields = []string{
socFieldAddress,
socFieldRecoveredPubKey,
socFieldIdentifier,
socFieldSignature,
socFieldWrappedAddress,
socFieldSpan,
socFieldPayload,
}

// maxSocFieldsSize is the maximum size of a serialized SOC fields message when
// every field is requested: the whole single owner chunk (identifier +
// signature + span + payload, i.e. SocMaxChunkSize) plus the derived metadata
// fields that are not part of the chunk on the wire (soc address, recovered
// public key and wrapped chunk address).
const maxSocFieldsSize = swarm.SocMaxChunkSize +
swarm.HashSize + // soc address
soc.OwnerPubKeySize + // recovered public key
swarm.HashSize // wrapped chunk address

// parseSocFields parses the SwarmSocFieldsHeader value into a list of SOC field
// identifiers. When the header is empty it defaults to the payload field only,
// which preserves backward compatibility.
func parseSocFields(header string) ([]string, error) {
if strings.TrimSpace(header) == "" {
return []string{socFieldPayload}, nil
}

parts := strings.Split(header, ",")
fields := make([]string, 0, len(parts))
for _, p := range parts {
f := strings.ToLower(strings.TrimSpace(p))
if f == "" {
continue
}
if !slices.Contains(validSocFields, f) {
return nil, fmt.Errorf("unknown soc field: %q", p)
}
fields = append(fields, f)
}
if len(fields) == 0 {
return []string{socFieldPayload}, nil
}
return fields, nil
}

// socFieldsBytes serializes the requested SOC fields in the same order as they
// were provided in the header.
func socFieldsBytes(c *soc.SOC, fields []string) ([]byte, error) {
buf := bytes.NewBuffer(nil)
for _, f := range fields {
switch f {
case socFieldAddress:
addr, err := c.Address()
if err != nil {
return nil, fmt.Errorf("soc address: %w", err)
}
buf.Write(addr.Bytes())
case socFieldRecoveredPubKey:
buf.Write(c.OwnerPubKey())
case socFieldIdentifier:
buf.Write(c.ID())
case socFieldSignature:
buf.Write(c.Signature())
case socFieldWrappedAddress:
buf.Write(c.WrappedChunk().Address().Bytes())
case socFieldSpan:
buf.Write(c.WrappedChunk().Data()[:swarm.SpanSize])
case socFieldPayload:
buf.Write(c.WrappedChunk().Data()[swarm.SpanSize:])
}
}
return buf.Bytes(), nil
}

func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("gsoc_subscribe").Build()

Expand All @@ -26,9 +119,31 @@
return
}

headers := struct {
SocFields string `map:"Swarm-Soc-Fields"`
CacheWrappedChunk bool `map:"Swarm-Cache-Wrapped-Chunk"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
return
}

fields, err := parseSocFields(headers.SocFields)
if err != nil {
logger.Debug("invalid soc fields header", "error", err)

Check failure on line 133 in pkg/api/gsoc.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "invalid soc fields header" 3 times.

See more on https://sonarcloud.io/project/issues?id=ethersphere_bee&issues=AZ6nEuYMxmPCWMBDjXDO&open=AZ6nEuYMxmPCWMBDjXDO&pullRequest=5497
logger.Error(nil, "invalid soc fields header")
jsonhttp.BadRequest(w, "invalid soc fields header")
return
}

upgrader := websocket.Upgrader{
ReadBufferSize: swarm.ChunkSize,
WriteBufferSize: swarm.ChunkSize,
ReadBufferSize: swarm.SocMaxChunkSize,
// WriteBufferSize is only an I/O buffer hint; it does not cap the
// message size. The serialized output can be the whole single owner
// chunk plus the derived metadata fields (soc address, recovered public
// key, wrapped chunk address), so size it to that maximum to avoid split
// writes.
WriteBufferSize: maxSocFieldsSize,
CheckOrigin: s.checkOrigin,
}

Expand All @@ -41,25 +156,39 @@
}

s.wsWg.Add(1)
go s.gsocListeningWs(conn, paths.Address)
go s.gsocListeningWs(conn, paths.Address, fields, headers.CacheWrappedChunk)
}

func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress swarm.Address) {
func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress swarm.Address, fields []string, cacheWrappedChunk bool) {
defer s.wsWg.Done()

var (
dataC = make(chan []byte)
gone = make(chan struct{})
ticker = time.NewTicker(s.WsPingPeriod)
err error
dataC = make(chan []byte)
gone = make(chan struct{})
ticker = time.NewTicker(s.WsPingPeriod)
ctx, cancel = context.WithCancel(context.Background()) // for storing cached chunks
err error
)
defer func() {
cancel()
ticker.Stop()
_ = conn.Close()
}()
cleanup := s.gsoc.Subscribe(socAddress, func(m []byte) {
cleanup := s.gsoc.Subscribe(socAddress, func(c *soc.SOC) {
if cacheWrappedChunk {
if err := s.storer.Cache().Put(ctx, c.WrappedChunk()); err != nil {
s.logger.Debug("gsoc ws: cache wrapped chunk failed", "error", err)
}
}

b, err := socFieldsBytes(c, fields)
if err != nil {
s.logger.Warning("gsoc ws: serialize soc fields failed", "error", err)
return
}

select {
case dataC <- m:
case dataC <- b:
Comment thread
nugaon marked this conversation as resolved.
case <-gone:
return
case <-s.quit:
Expand Down
Loading
Loading