diff --git a/cmd/upload/root.go b/cmd/upload/root.go index b7ecba44..83494f32 100644 --- a/cmd/upload/root.go +++ b/cmd/upload/root.go @@ -107,6 +107,7 @@ var Cmd = &cobra.Command{ preparation.WithAssumeUnchangedSources(rootFlags.assumeUnchangedSources), preparation.WithEventBus(eb), preparation.WithReplicas(cfg.Upload.Replicas), + preparation.WithPutHTTPClient(cmdutil.TracedHTTPClient), ) allUploads, err := api.FindOrCreateUploads(ctx, spaceDID) if err != nil { diff --git a/cmd/upload/source/add.go b/cmd/upload/source/add.go index 34bdd1d9..315649d1 100644 --- a/cmd/upload/source/add.go +++ b/cmd/upload/source/add.go @@ -73,7 +73,7 @@ var AddCmd = &cobra.Command{ return err } - api := preparation.NewAPI(repo, client) + api := preparation.NewAPI(repo, client, preparation.WithPutHTTPClient(cmdutil.TracedHTTPClient)) // Parse shard size if provided var spaceOptions []model.SpaceOption diff --git a/internal/cmdutil/cmdutil.go b/internal/cmdutil/cmdutil.go index 38cd85d9..19ef354e 100644 --- a/internal/cmdutil/cmdutil.go +++ b/internal/cmdutil/cmdutil.go @@ -43,8 +43,10 @@ func envSigner() (principal.Signer, error) { return signer.Parse(str) } -var tracedHttpClient = &http.Client{ - Transport: otelhttp.NewTransport(http.DefaultTransport), +// TracedHTTPClient is an HTTP client with OpenTelemetry tracing and a guppy +// User-Agent header on all outbound requests. +var TracedHTTPClient = &http.Client{ + Transport: newUserAgentTransport(otelhttp.NewTransport(http.DefaultTransport)), } // MustGetClient creates a new client suitable for the CLI, using stored data, @@ -77,7 +79,7 @@ func MustGetClientForNetwork(storePath string, networkCfg config.NetworkConfig, conn, err := uclient.NewConnection( network.UploadID, - uhttp.NewChannel(&network.UploadURL, uhttp.WithClient(tracedHttpClient)), + uhttp.NewChannel(&network.UploadURL, uhttp.WithClient(TracedHTTPClient)), uclient.WithOutboundCodec(car.NewOutboundCodec()), ) if err != nil { @@ -90,7 +92,7 @@ func MustGetClientForNetwork(storePath string, networkCfg config.NetworkConfig, append( options, client.WithConnection(conn), - client.WithReceiptsClient(receiptclient.New(&network.ReceiptsURL, receiptclient.WithHTTPClient(tracedHttpClient))), + client.WithReceiptsClient(receiptclient.New(&network.ReceiptsURL, receiptclient.WithHTTPClient(TracedHTTPClient))), )..., )..., ) @@ -132,7 +134,7 @@ func MustGetIndexClient(networkCfg config.NetworkConfig) (*indexclient.Client, u func MustGetIndexClientForNetwork(networkCfg config.NetworkConfig, flagName string) (*indexclient.Client, ucan.Principal) { network := MustGetNetworkConfig(networkCfg, flagName) - client, err := indexclient.New(network.IndexerID, network.IndexerURL, indexclient.WithHTTPClient(tracedHttpClient)) + client, err := indexclient.New(network.IndexerID, network.IndexerURL, indexclient.WithHTTPClient(TracedHTTPClient)) if err != nil { log.Fatal(err) } diff --git a/internal/cmdutil/useragent.go b/internal/cmdutil/useragent.go new file mode 100644 index 00000000..267d41cc --- /dev/null +++ b/internal/cmdutil/useragent.go @@ -0,0 +1,26 @@ +package cmdutil + +import ( + "fmt" + "net/http" + + "github.com/storacha/guppy/pkg/build" +) + +type userAgentTransport struct { + userAgent string + base http.RoundTripper +} + +func (t *userAgentTransport) RoundTrip(req *http.Request) (*http.Response, error) { + req = req.Clone(req.Context()) + req.Header.Set("User-Agent", t.userAgent) + return t.base.RoundTrip(req) +} + +func newUserAgentTransport(base http.RoundTripper) http.RoundTripper { + return &userAgentTransport{ + userAgent: fmt.Sprintf("guppy/%s", build.Version), + base: base, + } +} diff --git a/pkg/preparation/preparation.go b/pkg/preparation/preparation.go index c3a93b74..3341e37c 100644 --- a/pkg/preparation/preparation.go +++ b/pkg/preparation/preparation.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io/fs" + "net/http" "os" "path/filepath" @@ -12,6 +13,7 @@ import ( "github.com/storacha/go-ucanto/did" "github.com/storacha/guppy/pkg/bus" + clientpkg "github.com/storacha/guppy/pkg/client" "github.com/storacha/guppy/pkg/preparation/blobs" "github.com/storacha/guppy/pkg/preparation/dags" "github.com/storacha/guppy/pkg/preparation/dags/nodereader" @@ -61,6 +63,7 @@ type config struct { assumeUnchangedSources bool bus bus.Bus replicas uint + putHTTPClient *http.Client } const ( @@ -156,6 +159,10 @@ func NewAPI(repo Repo, client StorachaClient, options ...Option) API { ShardEncoder: blobs.NewCAREncoder(), } + var blobAddOptions []clientpkg.SpaceBlobAddOption + if cfg.putHTTPClient != nil { + blobAddOptions = append(blobAddOptions, clientpkg.WithPutClient(cfg.putHTTPClient)) + } storachaAPI := storacha.API{ Repo: repo, Client: client, @@ -164,6 +171,7 @@ func NewAPI(repo Repo, client StorachaClient, options ...Option) API { BlobUploadParallelism: cfg.blobUploadParallelism, Bus: cfg.bus, Replicas: cfg.replicas, + BlobAddOptions: blobAddOptions, } uploadsAPI = uploads.API{ @@ -247,6 +255,14 @@ func WithReplicas(replicas uint) Option { } } +// WithPutHTTPClient sets the HTTP client used for blob PUT uploads. +func WithPutHTTPClient(c *http.Client) Option { + return func(cfg *config) error { + cfg.putHTTPClient = c + return nil + } +} + func (a API) FindOrCreateSpace(ctx context.Context, spaceDID did.DID, name string, options ...spacesmodel.SpaceOption) (*spacesmodel.Space, error) { return a.Spaces.FindOrCreateSpace(ctx, spaceDID, name, options...) } diff --git a/pkg/preparation/storacha/storacha.go b/pkg/preparation/storacha/storacha.go index 3fedf426..b51806b1 100644 --- a/pkg/preparation/storacha/storacha.go +++ b/pkg/preparation/storacha/storacha.go @@ -65,6 +65,7 @@ type API struct { BlobUploadParallelism int Bus bus.Publisher Replicas uint + BlobAddOptions []client.SpaceBlobAddOption } var _ uploads.AddShardsForUploadFunc = API{}.AddShardsForUpload @@ -336,7 +337,7 @@ func (a API) spaceBlobAdd(ctx context.Context, content io.Reader, spaceDID did.D ctx, span := tracer.Start(ctx, "space-blob-add") defer span.End() - return a.Client.SpaceBlobAdd(ctx, content, spaceDID, opts...) + return a.Client.SpaceBlobAdd(ctx, content, spaceDID, append(a.BlobAddOptions, opts...)...) } func (a API) spaceBlobReplicate(ctx context.Context, blob model.Blob, spaceDID did.DID, locationCommitment delegation.Delegation) error {