Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/upload/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ var Cmd = &cobra.Command{
preparation.WithAssumeUnchangedSources(rootFlags.assumeUnchangedSources),
preparation.WithEventBus(eb),
preparation.WithReplicas(cfg.Upload.Replicas),
preparation.WithPutHTTPClient(cmdutil.TracedHTTPClient),
)
allUploads, err := api.FindOrCreateUploads(ctx, spaceDID)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/upload/source/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ var AddCmd = &cobra.Command{
return err
}

api := preparation.NewAPI(repo, client)
api := preparation.NewAPI(repo, client, preparation.WithPutHTTPClient(cmdutil.TracedHTTPClient))

// Parse shard size if provided
var spaceOptions []model.SpaceOption
Expand Down
12 changes: 7 additions & 5 deletions internal/cmdutil/cmdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ func envSigner() (principal.Signer, error) {
return signer.Parse(str)
}

var tracedHttpClient = &http.Client{
Transport: otelhttp.NewTransport(http.DefaultTransport),
// TracedHTTPClient is an HTTP client with OpenTelemetry tracing and a guppy
// User-Agent header on all outbound requests.
var TracedHTTPClient = &http.Client{
Transport: newUserAgentTransport(otelhttp.NewTransport(http.DefaultTransport)),
}

// MustGetClient creates a new client suitable for the CLI, using stored data,
Expand Down Expand Up @@ -77,7 +79,7 @@ func MustGetClientForNetwork(storePath string, networkCfg config.NetworkConfig,

conn, err := uclient.NewConnection(
network.UploadID,
uhttp.NewChannel(&network.UploadURL, uhttp.WithClient(tracedHttpClient)),
uhttp.NewChannel(&network.UploadURL, uhttp.WithClient(TracedHTTPClient)),
uclient.WithOutboundCodec(car.NewOutboundCodec()),
)
if err != nil {
Expand All @@ -90,7 +92,7 @@ func MustGetClientForNetwork(storePath string, networkCfg config.NetworkConfig,
append(
options,
client.WithConnection(conn),
client.WithReceiptsClient(receiptclient.New(&network.ReceiptsURL, receiptclient.WithHTTPClient(tracedHttpClient))),
client.WithReceiptsClient(receiptclient.New(&network.ReceiptsURL, receiptclient.WithHTTPClient(TracedHTTPClient))),
)...,
)...,
)
Expand Down Expand Up @@ -132,7 +134,7 @@ func MustGetIndexClient(networkCfg config.NetworkConfig) (*indexclient.Client, u
func MustGetIndexClientForNetwork(networkCfg config.NetworkConfig, flagName string) (*indexclient.Client, ucan.Principal) {
network := MustGetNetworkConfig(networkCfg, flagName)

client, err := indexclient.New(network.IndexerID, network.IndexerURL, indexclient.WithHTTPClient(tracedHttpClient))
client, err := indexclient.New(network.IndexerID, network.IndexerURL, indexclient.WithHTTPClient(TracedHTTPClient))
if err != nil {
log.Fatal(err)
}
Expand Down
26 changes: 26 additions & 0 deletions internal/cmdutil/useragent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package cmdutil

import (
"fmt"
"net/http"

"github.com/storacha/guppy/pkg/build"
)

type userAgentTransport struct {
userAgent string
base http.RoundTripper
}

func (t *userAgentTransport) RoundTrip(req *http.Request) (*http.Response, error) {
req = req.Clone(req.Context())
req.Header.Set("User-Agent", t.userAgent)
return t.base.RoundTrip(req)
}

func newUserAgentTransport(base http.RoundTripper) http.RoundTripper {
return &userAgentTransport{
userAgent: fmt.Sprintf("guppy/%s", build.Version),
base: base,
}
}
16 changes: 16 additions & 0 deletions pkg/preparation/preparation.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"errors"
"fmt"
"io/fs"
"net/http"
"os"
"path/filepath"

"github.com/ipfs/go-cid"
"github.com/storacha/go-ucanto/did"

"github.com/storacha/guppy/pkg/bus"
clientpkg "github.com/storacha/guppy/pkg/client"
"github.com/storacha/guppy/pkg/preparation/blobs"
"github.com/storacha/guppy/pkg/preparation/dags"
"github.com/storacha/guppy/pkg/preparation/dags/nodereader"
Expand Down Expand Up @@ -61,6 +63,7 @@ type config struct {
assumeUnchangedSources bool
bus bus.Bus
replicas uint
putHTTPClient *http.Client
}

const (
Expand Down Expand Up @@ -156,6 +159,10 @@ func NewAPI(repo Repo, client StorachaClient, options ...Option) API {
ShardEncoder: blobs.NewCAREncoder(),
}

var blobAddOptions []clientpkg.SpaceBlobAddOption
if cfg.putHTTPClient != nil {
blobAddOptions = append(blobAddOptions, clientpkg.WithPutClient(cfg.putHTTPClient))
}
storachaAPI := storacha.API{
Repo: repo,
Client: client,
Expand All @@ -164,6 +171,7 @@ func NewAPI(repo Repo, client StorachaClient, options ...Option) API {
BlobUploadParallelism: cfg.blobUploadParallelism,
Bus: cfg.bus,
Replicas: cfg.replicas,
BlobAddOptions: blobAddOptions,
}

uploadsAPI = uploads.API{
Expand Down Expand Up @@ -247,6 +255,14 @@ func WithReplicas(replicas uint) Option {
}
}

// WithPutHTTPClient sets the HTTP client used for blob PUT uploads.
func WithPutHTTPClient(c *http.Client) Option {
return func(cfg *config) error {
cfg.putHTTPClient = c
return nil
}
}

func (a API) FindOrCreateSpace(ctx context.Context, spaceDID did.DID, name string, options ...spacesmodel.SpaceOption) (*spacesmodel.Space, error) {
return a.Spaces.FindOrCreateSpace(ctx, spaceDID, name, options...)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/preparation/storacha/storacha.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type API struct {
BlobUploadParallelism int
Bus bus.Publisher
Replicas uint
BlobAddOptions []client.SpaceBlobAddOption
}

var _ uploads.AddShardsForUploadFunc = API{}.AddShardsForUpload
Expand Down Expand Up @@ -336,7 +337,7 @@ func (a API) spaceBlobAdd(ctx context.Context, content io.Reader, spaceDID did.D
ctx, span := tracer.Start(ctx, "space-blob-add")
defer span.End()

return a.Client.SpaceBlobAdd(ctx, content, spaceDID, opts...)
return a.Client.SpaceBlobAdd(ctx, content, spaceDID, append(a.BlobAddOptions, opts...)...)
}

func (a API) spaceBlobReplicate(ctx context.Context, blob model.Blob, spaceDID did.DID, locationCommitment delegation.Delegation) error {
Expand Down
Loading