diff --git a/CLAUDE.md b/CLAUDE.md index d6bf56f..92a2888 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -76,6 +76,7 @@ Configuration via YAML file or environment variables with `SPRUE_` prefix: - `SPRUE_STORAGE_POSTGRES_DSN`, `SPRUE_STORAGE_POSTGRES_MAX_CONNS`, `SPRUE_STORAGE_POSTGRES_SKIP_MIGRATIONS` - `SPRUE_STORAGE_DYNAMODB_*` for DynamoDB settings (AWS backend) - `SPRUE_STORAGE_S3_*` for S3/MinIO settings +- `SPRUE_TELEMETRY_TRACES_*` and `SPRUE_TELEMETRY_METRICS_*` — OTLP/HTTP exporter config. Empty endpoint disables that signal. Supports `endpoint`, `insecure`, `headers` (comma-separated `k=v,k=v` via env/flag), `timeout`, `compression` (`"" | "gzip"`), plus `sample_ratio` (traces) or `export_interval` (metrics). The same fields are exposed as hidden `--telemetry-traces-*` / `--telemetry-metrics-*` flags on `serve`. ### Key Dependencies diff --git a/cmd/client/lib/client.go b/cmd/client/lib/client.go index 368fea0..90227f0 100644 --- a/cmd/client/lib/client.go +++ b/cmd/client/lib/client.go @@ -20,7 +20,7 @@ func InitClient(cmd *cobra.Command) (*client.Client, *config.Config, *zap.Logger cfg, err := config.Load(configFile) cobra.CheckErr(err) - logger, err := fx.NewLogger(cfg) + logger, err := fx.NewZapLogger(cfg) cobra.CheckErr(err) id, err := fx.NewIdentity(cfg, logger) cobra.CheckErr(err) diff --git a/cmd/main.go b/cmd/main.go index 0523233..6e729a9 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -15,7 +15,10 @@ import ( appfx "github.com/storacha/sprue/internal/fx" ) -var cfgFile string +var ( + cfgFile string + loaded *config.Config +) func main() { rootCmd := &cobra.Command{ @@ -31,6 +34,11 @@ Routes blob allocations to Piri nodes and tracks upload state in DynamoDB.`, RunE: runServe, } + // Hidden telemetry flags on `serve`. Registered with zero-value defaults; + // actual binding into viper happens in PersistentPreRunE after cobra has + // parsed argv, so pflag.Changed reflects only flags the user passed. + config.RegisterTelemetryFlags(serveCmd) + rootCmd.AddCommand(serveCmd) rootCmd.AddCommand(client.Cmd) rootCmd.AddCommand(identity.Cmd) @@ -38,6 +46,28 @@ Routes blob allocations to Piri nodes and tracks upload state in DynamoDB.`, // Global flags rootCmd.PersistentFlags().StringVarP(&cfgFile, "config", "c", "", "config file path (default: looks for config.yaml in current dir)") + // PersistentPreRunE only loads config for the `serve` subcommand — the + // client / identity commands load their own state from elsewhere. + rootCmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) error { + if cmd != serveCmd { + return nil + } + cfg, v, err := config.LoadWithViper(cfgFile) + if err != nil { + return err + } + if err := config.BindTelemetryFlags(v, cmd); err != nil { + return err + } + // Re-unmarshal so that any flag values bound above now override yaml/env. + cfg, err = config.Unmarshal(v) + if err != nil { + return err + } + loaded = cfg + return nil + } + if err := rootCmd.Execute(); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) @@ -45,11 +75,12 @@ Routes blob allocations to Piri nodes and tracks upload state in DynamoDB.`, } func runServe(cmd *cobra.Command, args []string) error { - cfg, err := config.Load(cfgFile) - cobra.CheckErr(err) + if loaded == nil { + return fmt.Errorf("config not loaded") + } app := fx.New( - appfx.AppModule(cfg), + appfx.AppModule(loaded), // Suppress fx's default logging and use our own zap logger fx.WithLogger(func(log *zap.Logger) fxevent.Logger { return &fxevent.ZapLogger{Logger: log} diff --git a/config.example.yaml b/config.example.yaml index 7d4f3c5..ffb0cbd 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -108,3 +108,30 @@ storage: log: # Log level: debug, info, warn, error level: "info" + +# OpenTelemetry export to an OTLP/HTTP collector. Traces and metrics are +# configured independently; an empty endpoint disables that signal. Resource +# attributes (service.name, deployment.environment) are derived at startup — +# service.name is always "sprue" and environment comes from deployment.environment +# above. The example block below is commented out so the defaults stay +# disabled. +# +# All fields can also be set via env var (e.g. SPRUE_TELEMETRY_TRACES_ENDPOINT) +# or as hidden flags on `serve` (e.g. --telemetry-traces-endpoint). +#telemetry: +# traces: +# endpoint: "otel-collector:4318" # empty = disabled +# insecure: true # http:// when scheme ambiguous +# headers: # sent on every export request +# Authorization: "Bearer ..." +# timeout: "10s" # 0 uses SDK default (10s) +# compression: "gzip" # "" or "gzip" +# sample_ratio: 0 # probability of tracing an unparented request; 0 = never (parented requests are always honoured) +# metrics: +# endpoint: "otel-collector:4318" # empty = disabled +# insecure: true +# headers: +# Authorization: "Bearer ..." +# timeout: "10s" +# compression: "gzip" +# export_interval: "60s" # 0 uses SDK default (60s) diff --git a/go.mod b/go.mod index 6da5e59..e7ec9f9 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.20.34 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.56.1 github.com/aws/aws-sdk-go-v2/service/s3 v1.96.4 + github.com/exaring/otelpgx v0.10.0 github.com/google/uuid v1.6.0 github.com/ipfs/go-cid v0.6.0 github.com/ipfs/go-log/v2 v2.9.0 @@ -29,11 +30,20 @@ require ( github.com/testcontainers/testcontainers-go/modules/dynamodb v0.41.0 github.com/testcontainers/testcontainers-go/modules/minio v0.40.0 github.com/testcontainers/testcontainers-go/modules/postgres v0.42.0 + github.com/uptrace/opentelemetry-go-extra/otelzap v0.3.2 + go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.63.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.43.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0 + go.opentelemetry.io/otel/sdk v1.43.0 + go.opentelemetry.io/otel/sdk/metric v1.43.0 go.uber.org/fx v1.24.0 go.uber.org/zap v1.27.0 ) require ( + github.com/cenkalti/backoff/v5 v5.0.3 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect @@ -42,10 +52,11 @@ require ( github.com/moby/moby/client v0.4.0 // indirect github.com/moby/sys/atomicwriter v0.1.0 // indirect github.com/sethvargo/go-retry v0.3.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0 // indirect - go.opentelemetry.io/otel/sdk v1.43.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.43.0 // indirect + github.com/uptrace/opentelemetry-go-extra/otelutil v0.3.2 // indirect + go.opentelemetry.io/otel/log v0.6.0 // indirect + go.opentelemetry.io/proto/otlp v1.10.0 // indirect golang.org/x/sync v0.20.0 // indirect + google.golang.org/grpc v1.80.0 // indirect ) require ( @@ -92,7 +103,7 @@ require ( github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.2.6 // indirect - github.com/go-viper/mapstructure/v2 v2.4.0 // indirect + github.com/go-viper/mapstructure/v2 v2.4.0 github.com/gobwas/glob v0.2.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect @@ -167,10 +178,10 @@ require ( github.com/whyrusleeping/cbor-gen v0.3.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0 // indirect - go.opentelemetry.io/otel v1.43.0 // indirect - go.opentelemetry.io/otel/metric v1.43.0 // indirect - go.opentelemetry.io/otel/trace v1.43.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0 + go.opentelemetry.io/otel v1.43.0 + go.opentelemetry.io/otel/metric v1.43.0 + go.opentelemetry.io/otel/trace v1.43.0 go.uber.org/atomic v1.11.0 // indirect go.uber.org/dig v1.19.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/go.sum b/go.sum index 7dc4c21..031d37e 100644 --- a/go.sum +++ b/go.sum @@ -166,6 +166,8 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/exaring/otelpgx v0.10.0 h1:NGGegdoBQM3jNZDKG8ENhigUcgBN7d7943L0YlcIpZc= +github.com/exaring/otelpgx v0.10.0/go.mod h1:R5/M5LWsPPBZc1SrRE5e0DiU48bI78C1/GPTWs6I66U= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= @@ -234,6 +236,8 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -278,7 +282,6 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k= -github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 h1:HWRh5R2+9EifMyIHV7ZV+MIZqgz+PMpZ14Jynv3O2Zs= github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0/go.mod h1:JfhWUomR1baixubs02l85lZYYOm7LV6om4ceouMv45c= @@ -682,6 +685,10 @@ github.com/tklauser/numcpus v0.11.0 h1:nSTwhKH5e1dMNsCdVBukSZrURJRoHbSEQjdEbY+9R github.com/tklauser/numcpus v0.11.0/go.mod h1:z+LwcLq54uWZTX0u/bGobaV34u6V7KNlTZejzM6/3MQ= github.com/ucan-wg/go-ucan v0.0.0-20240916120445-37f52863156c h1:A1pMNIlHPnJ6KROqNc6SKg7QlSiQA6umiEoy89Os4cM= github.com/ucan-wg/go-ucan v0.0.0-20240916120445-37f52863156c/go.mod h1:IiRc1OKWUk7FziOTWmOo7iwbcEMr7ch0lgs3UrF13pU= +github.com/uptrace/opentelemetry-go-extra/otelutil v0.3.2 h1:3/aHKUq7qaFMWxyQV0W2ryNgg8x8rVeKVA20KJUkfS0= +github.com/uptrace/opentelemetry-go-extra/otelutil v0.3.2/go.mod h1:Zit4b8AQXaXvA68+nzmbyDzqiyFRISyw1JiD5JqUBjw= +github.com/uptrace/opentelemetry-go-extra/otelzap v0.3.2 h1:cj/Z6FKTTYBnstI0Lni9PA+k2foounKIPUmj1LBwNiQ= +github.com/uptrace/opentelemetry-go-extra/otelzap v0.3.2/go.mod h1:LDaXk90gKEC2nC7JH3Lpnhfu+2V7o/TsqomJJmqA39o= github.com/urfave/cli v1.22.10/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= @@ -715,14 +722,22 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.63.0 h1:6YeICKmGrvgJ5th4+OMNpcuoB6q/Xs8gt0YCO7MUv1k= +go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.63.0/go.mod h1:ZEA7j2B35siNV0T00aapacNzjz4tvOlNoHp0ncCfwNQ= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0 h1:OyrsyzuttWTSur2qN/Lm0m2a8yqyIjUVBZcxFPuXq2o= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.67.0/go.mod h1:C2NGBr+kAB4bk3xtMXfZ94gqFDtg/GkI7e9zqGh5Beg= +go.opentelemetry.io/contrib/propagators/b3 v1.38.0 h1:uHsCCOSKl0kLrV2dLkFK+8Ywk9iKa/fptkytc6aFFEo= +go.opentelemetry.io/contrib/propagators/b3 v1.38.0/go.mod h1:wMRSZJZcY8ya9mApLLhwIMjqmApy2o/Ml+62lhvxyHU= go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I= go.opentelemetry.io/otel v1.43.0/go.mod h1:JuG+u74mvjvcm8vj8pI5XiHy1zDeoCS2LB1spIq7Ay0= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.43.0 h1:w1K+pCJoPpQifuVpsKamUdn9U0zM3xUziVOqsGksUrY= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.43.0/go.mod h1:HBy4BjzgVE8139ieRI75oXm3EcDN+6GhD88JT1Kjvxg= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 h1:88Y4s2C8oTui1LGM6bTWkw0ICGcOLCAI5l6zsD1j20k= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0/go.mod h1:Vl1/iaggsuRlrHf/hfPJPvVag77kKyvrLeD10kpMl+A= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0 h1:3iZJKlCZufyRzPzlQhUIWVmfltrXuGyfjREgGP3UUjc= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.43.0/go.mod h1:/G+nUPfhq2e+qiXMGxMwumDrP5jtzU+mWN7/sjT2rak= +go.opentelemetry.io/otel/log v0.6.0 h1:nH66tr+dmEgW5y+F9LanGJUBYPrRgP4g2EkmPE3LeK8= +go.opentelemetry.io/otel/log v0.6.0/go.mod h1:KdySypjQHhP069JX0z/t26VHwa8vSwzgaKmXtIB3fJM= go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM= go.opentelemetry.io/otel/metric v1.43.0/go.mod h1:RDnPtIxvqlgO8GRW18W6Z/4P462ldprJtfxHxyKd2PY= go.opentelemetry.io/otel/sdk v1.43.0 h1:pi5mE86i5rTeLXqoF/hhiBtUNcrAGHLKQdhg4h4V9Dg= @@ -1018,6 +1033,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhSt0ABwskkZKjD3bXGnZGpNY= golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= +gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= +gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg= diff --git a/internal/config/config.go b/internal/config/config.go index 66ac223..dc67d8b 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -2,8 +2,12 @@ package config import ( "fmt" + "reflect" "strings" + "time" + "github.com/go-viper/mapstructure/v2" + "github.com/spf13/cobra" "github.com/spf13/viper" ) @@ -23,6 +27,7 @@ type Config struct { Storage StorageConfig `mapstructure:"storage"` Log LogConfig `mapstructure:"log"` Mailer MailerConfig `mapstructure:"mailer"` + Telemetry TelemetryConfig `mapstructure:"telemetry"` } type DeploymentConfig struct { @@ -164,6 +169,173 @@ type MailerConfig struct { SMTPAuthSecret string `mapstructure:"smtp_auth_secret"` } +// TelemetryConfig configures OpenTelemetry trace and metric export to an OTLP +// collector. Traces and Metrics are configured independently; leaving either +// Endpoint empty disables that signal. Resource attributes (service.name, +// deployment.environment) are derived at startup and are not configurable +// here to avoid splitting the service's identity across sources of truth. +type TelemetryConfig struct { + Traces TracesConfig `mapstructure:"traces"` + Metrics MetricsConfig `mapstructure:"metrics"` +} + +// TracesConfig holds OTLP/HTTP trace exporter settings. +type TracesConfig struct { + // Endpoint is the OTLP/HTTP collector endpoint (host:port or full URL). + // Empty disables trace export. + Endpoint string `mapstructure:"endpoint"` + // Insecure forces http:// when the Endpoint scheme is ambiguous. + Insecure bool `mapstructure:"insecure"` + // Headers are sent with each export request (e.g. {"Authorization": "Bearer ..."}). + // When set via env var or flag, use comma-separated "k1=v1,k2=v2" form. + Headers map[string]string `mapstructure:"headers"` + // Timeout bounds each export request. Zero uses the SDK default (10s). + Timeout time.Duration `mapstructure:"timeout"` + // Compression is "" (no compression) or "gzip". + Compression string `mapstructure:"compression"` + // SampleRatio is the probability [0,1] that a new root trace — one + // where the incoming request has no upstream traceparent — is recorded. + // 0 (the default) means unparented requests (e.g. health-check probes) + // are never traced. Requests carrying a sampled traceparent are always + // honoured regardless of this value. + SampleRatio float64 `mapstructure:"sample_ratio"` +} + +// MetricsConfig holds OTLP/HTTP metric exporter settings. +type MetricsConfig struct { + // Endpoint is the OTLP/HTTP collector endpoint (host:port or full URL). + // Empty disables metric export. + Endpoint string `mapstructure:"endpoint"` + // Insecure forces http:// when the Endpoint scheme is ambiguous. + Insecure bool `mapstructure:"insecure"` + // Headers are sent with each export request. + // When set via env var or flag, use comma-separated "k1=v1,k2=v2" form. + Headers map[string]string `mapstructure:"headers"` + // Timeout bounds each export request. Zero uses the SDK default (10s). + Timeout time.Duration `mapstructure:"timeout"` + // Compression is "" (no compression) or "gzip". + Compression string `mapstructure:"compression"` + // ExportInterval is the periodic push interval. Zero uses the SDK default (60s). + ExportInterval time.Duration `mapstructure:"export_interval"` +} + +// RegisterTelemetryFlags defines the hidden --telemetry-* flags on cmd. +// Flags are registered with zero-value defaults so that an unset flag does +// not shadow an env var or yaml value via viper's BindPFlag path. Actual +// binding to a viper instance is done later, after cobra has parsed argv. +func RegisterTelemetryFlags(cmd *cobra.Command) { + f := cmd.Flags() + + f.String("telemetry-traces-endpoint", "", "OTLP/HTTP traces endpoint (host:port or URL); empty disables") + f.Bool("telemetry-traces-insecure", false, "use http:// for traces when scheme ambiguous") + f.String("telemetry-traces-headers", "", "traces exporter headers, comma-separated k=v pairs") + f.Duration("telemetry-traces-timeout", 0, "traces export timeout (SDK default when 0)") + f.String("telemetry-traces-compression", "", "traces exporter compression: empty or \"gzip\"") + f.Float64("telemetry-traces-sample-ratio", 0, "head-based sample ratio [0,1]; 0 uses AlwaysSample") + + f.String("telemetry-metrics-endpoint", "", "OTLP/HTTP metrics endpoint (host:port or URL); empty disables") + f.Bool("telemetry-metrics-insecure", false, "use http:// for metrics when scheme ambiguous") + f.String("telemetry-metrics-headers", "", "metrics exporter headers, comma-separated k=v pairs") + f.Duration("telemetry-metrics-timeout", 0, "metrics export timeout (SDK default when 0)") + f.String("telemetry-metrics-compression", "", "metrics exporter compression: empty or \"gzip\"") + f.Duration("telemetry-metrics-export-interval", 0, "metrics export interval (SDK default when 0)") + + hidden := []string{ + "telemetry-traces-endpoint", + "telemetry-traces-insecure", + "telemetry-traces-headers", + "telemetry-traces-timeout", + "telemetry-traces-compression", + "telemetry-traces-sample-ratio", + "telemetry-metrics-endpoint", + "telemetry-metrics-insecure", + "telemetry-metrics-headers", + "telemetry-metrics-timeout", + "telemetry-metrics-compression", + "telemetry-metrics-export-interval", + } + for _, name := range hidden { + _ = f.MarkHidden(name) + } +} + +// BindTelemetryFlags maps the hidden --telemetry-* flags on cmd to their +// dotted viper keys under "telemetry.*". Call this after cobra has parsed +// argv so pflag.Changed accurately reflects which flags the user passed. +func BindTelemetryFlags(v *viper.Viper, cmd *cobra.Command) error { + bindings := map[string]string{ + "telemetry.traces.endpoint": "telemetry-traces-endpoint", + "telemetry.traces.insecure": "telemetry-traces-insecure", + "telemetry.traces.headers": "telemetry-traces-headers", + "telemetry.traces.timeout": "telemetry-traces-timeout", + "telemetry.traces.compression": "telemetry-traces-compression", + "telemetry.traces.sample_ratio": "telemetry-traces-sample-ratio", + "telemetry.metrics.endpoint": "telemetry-metrics-endpoint", + "telemetry.metrics.insecure": "telemetry-metrics-insecure", + "telemetry.metrics.headers": "telemetry-metrics-headers", + "telemetry.metrics.timeout": "telemetry-metrics-timeout", + "telemetry.metrics.compression": "telemetry-metrics-compression", + "telemetry.metrics.export_interval": "telemetry-metrics-export-interval", + } + for key, flag := range bindings { + f := cmd.Flags().Lookup(flag) + if f == nil { + continue + } + if err := v.BindPFlag(key, f); err != nil { + return fmt.Errorf("binding flag %q: %w", flag, err) + } + } + return nil +} + +// stringToStringMapHookFunc parses comma-separated "k1=v1,k2=v2" strings into +// map[string]string when the target field is a string-keyed, string-valued +// map. It is a no-op for every other from/to combination so it can be safely +// composed with viper's default hooks. +func stringToStringMapHookFunc() mapstructure.DecodeHookFuncType { + mapStr := reflect.TypeOf(map[string]string{}) + return func(from, to reflect.Type, data any) (any, error) { + if from.Kind() != reflect.String || to != mapStr { + return data, nil + } + raw, _ := data.(string) + if raw == "" { + return map[string]string{}, nil + } + out := make(map[string]string) + for _, pair := range strings.Split(raw, ",") { + pair = strings.TrimSpace(pair) + if pair == "" { + continue + } + eq := strings.IndexByte(pair, '=') + if eq < 0 { + return nil, fmt.Errorf("invalid header pair %q: expected k=v", pair) + } + k := strings.TrimSpace(pair[:eq]) + v := strings.TrimSpace(pair[eq+1:]) + if k == "" { + return nil, fmt.Errorf("invalid header pair %q: empty key", pair) + } + out[k] = v + } + return out, nil + } +} + +// decodeHook returns the decode hook chain used by LoadWithViper. It extends +// viper's defaults with stringToStringMapHookFunc so that map[string]string +// fields (e.g. Telemetry.Traces.Headers) can be set from a single "k=v,k=v" +// string coming from an env var or cobra flag. +func decodeHook() mapstructure.DecodeHookFunc { + return mapstructure.ComposeDecodeHookFunc( + mapstructure.StringToTimeDurationHookFunc(), + mapstructure.StringToSliceHookFunc(","), + stringToStringMapHookFunc(), + ) +} + // SetDefaults configures default values for viper. func SetDefaults(v *viper.Viper) { // Server defaults @@ -246,9 +418,20 @@ func LoadWithViper(configFile string) (*Config, *viper.Viper, error) { } var cfg Config - if err := v.Unmarshal(&cfg); err != nil { + if err := v.Unmarshal(&cfg, viper.DecodeHook(decodeHook())); err != nil { return nil, nil, fmt.Errorf("unmarshaling config: %w", err) } return &cfg, v, nil } + +// Unmarshal decodes v into a Config using the same decode hook chain as +// LoadWithViper. Use this when you need to re-unmarshal after binding flags +// to a viper instance that was produced by LoadWithViper. +func Unmarshal(v *viper.Viper) (*Config, error) { + var cfg Config + if err := v.Unmarshal(&cfg, viper.DecodeHook(decodeHook())); err != nil { + return nil, fmt.Errorf("unmarshaling config: %w", err) + } + return &cfg, nil +} diff --git a/internal/config/config_test.go b/internal/config/config_test.go new file mode 100644 index 0000000..c4573a9 --- /dev/null +++ b/internal/config/config_test.go @@ -0,0 +1,161 @@ +package config + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/spf13/cobra" +) + +const telemetryYAML = ` +telemetry: + traces: + endpoint: "yaml-endpoint:4318" + sample_ratio: 0.25 + metrics: + endpoint: "yaml-metrics:4318" +` + +func writeConfigFile(t *testing.T, body string) string { + t.Helper() + dir := t.TempDir() + path := filepath.Join(dir, "config.yaml") + if err := os.WriteFile(path, []byte(body), 0o600); err != nil { + t.Fatalf("write config: %v", err) + } + return path +} + +// clearSprueEnv removes any SPRUE_* vars the test environment might already +// have set, so a test only sees what it deliberately exports. +func clearSprueEnv(t *testing.T) { + t.Helper() + for _, kv := range os.Environ() { + for i := 0; i < len(kv); i++ { + if kv[i] == '=' { + name := kv[:i] + if len(name) >= 6 && name[:6] == "SPRUE_" { + t.Setenv(name, "") + os.Unsetenv(name) + } + break + } + } + } +} + +// loadWithFlags re-runs the Load → BindTelemetryFlags → Unmarshal pipeline +// exactly the way cmd/main.go does it. +func loadWithFlags(t *testing.T, cfgPath string, argv []string) *Config { + t.Helper() + + cfg, v, err := LoadWithViper(cfgPath) + if err != nil { + t.Fatalf("LoadWithViper: %v", err) + } + + cmd := &cobra.Command{Use: "serve", RunE: func(*cobra.Command, []string) error { return nil }} + RegisterTelemetryFlags(cmd) + cmd.SetArgs(argv) + if err := cmd.ParseFlags(argv); err != nil { + t.Fatalf("parse flags: %v", err) + } + if err := BindTelemetryFlags(v, cmd); err != nil { + t.Fatalf("bind flags: %v", err) + } + cfg, err = Unmarshal(v) + if err != nil { + t.Fatalf("re-unmarshal: %v", err) + } + return cfg +} + +func TestTelemetry_YAMLOnly(t *testing.T) { + clearSprueEnv(t) + cfg := loadWithFlags(t, writeConfigFile(t, telemetryYAML), nil) + + if got := cfg.Telemetry.Traces.Endpoint; got != "yaml-endpoint:4318" { + t.Errorf("traces endpoint = %q, want yaml-endpoint:4318", got) + } + if got := cfg.Telemetry.Traces.SampleRatio; got != 0.25 { + t.Errorf("sample_ratio = %v, want 0.25", got) + } + if got := cfg.Telemetry.Metrics.Endpoint; got != "yaml-metrics:4318" { + t.Errorf("metrics endpoint = %q, want yaml-metrics:4318", got) + } +} + +func TestTelemetry_EnvOverridesYAML(t *testing.T) { + clearSprueEnv(t) + t.Setenv("SPRUE_TELEMETRY_TRACES_ENDPOINT", "env-endpoint:4318") + + cfg := loadWithFlags(t, writeConfigFile(t, telemetryYAML), nil) + + if got := cfg.Telemetry.Traces.Endpoint; got != "env-endpoint:4318" { + t.Errorf("traces endpoint = %q, want env-endpoint:4318 (env should win)", got) + } + // yaml still supplies values for fields the env did not touch. + if got := cfg.Telemetry.Metrics.Endpoint; got != "yaml-metrics:4318" { + t.Errorf("metrics endpoint = %q, want yaml-metrics:4318", got) + } +} + +func TestTelemetry_FlagOverridesEnv(t *testing.T) { + clearSprueEnv(t) + t.Setenv("SPRUE_TELEMETRY_TRACES_ENDPOINT", "env-endpoint:4318") + + cfg := loadWithFlags(t, writeConfigFile(t, telemetryYAML), []string{ + "--telemetry-traces-endpoint=flag-endpoint:4318", + }) + + if got := cfg.Telemetry.Traces.Endpoint; got != "flag-endpoint:4318" { + t.Errorf("traces endpoint = %q, want flag-endpoint:4318 (flag should win)", got) + } +} + +// Guards the "zero-default pflag does not clobber env" regression: bind a +// flag that was never passed on the command line, and verify the env value +// still wins. +func TestTelemetry_UnsetFlagDoesNotClobberEnv(t *testing.T) { + clearSprueEnv(t) + t.Setenv("SPRUE_TELEMETRY_TRACES_ENDPOINT", "env-endpoint:4318") + + cfg := loadWithFlags(t, writeConfigFile(t, ""), nil) + + if got := cfg.Telemetry.Traces.Endpoint; got != "env-endpoint:4318" { + t.Errorf("traces endpoint = %q, want env-endpoint:4318 (unset flag must not clobber env)", got) + } +} + +func TestTelemetry_HeadersFromEnv(t *testing.T) { + clearSprueEnv(t) + t.Setenv("SPRUE_TELEMETRY_TRACES_ENDPOINT", "env-endpoint:4318") + t.Setenv("SPRUE_TELEMETRY_TRACES_HEADERS", "Authorization=Bearer xyz,X-Tenant=acme") + + cfg := loadWithFlags(t, writeConfigFile(t, ""), nil) + + h := cfg.Telemetry.Traces.Headers + if got := h["Authorization"]; got != "Bearer xyz" { + t.Errorf("Authorization header = %q, want %q", got, "Bearer xyz") + } + if got := h["X-Tenant"]; got != "acme" { + t.Errorf("X-Tenant header = %q, want acme", got) + } +} + +// The composed decode hook must still decode time.Duration; this test +// catches the regression where adding a hook accidentally drops the default +// StringToTimeDurationHookFunc. +func TestTelemetry_DurationParsesAfterHookCompose(t *testing.T) { + clearSprueEnv(t) + t.Setenv("SPRUE_TELEMETRY_METRICS_ENDPOINT", "env-metrics:4318") + t.Setenv("SPRUE_TELEMETRY_METRICS_EXPORT_INTERVAL", "15s") + + cfg := loadWithFlags(t, writeConfigFile(t, ""), nil) + + if got := cfg.Telemetry.Metrics.ExportInterval; got != 15*time.Second { + t.Errorf("export_interval = %v, want 15s", got) + } +} diff --git a/internal/fx/app.go b/internal/fx/app.go index 0629579..e81af80 100644 --- a/internal/fx/app.go +++ b/internal/fx/app.go @@ -18,6 +18,7 @@ var AppModule = func(cfg *config.Config) fx.Option { fx.Supply(cfg), ConfigModule, LoggerModule, + TelemetryModule, IdentityModule, ServicesModule, ClientsModule, diff --git a/internal/fx/config.go b/internal/fx/config.go index 011f7b5..61ef07e 100644 --- a/internal/fx/config.go +++ b/internal/fx/config.go @@ -22,6 +22,7 @@ type Configs struct { S3 config.S3Config Log config.LogConfig Mailer config.MailerConfig + Telemetry config.TelemetryConfig } // ProvideConfigs provides the individual fields of the config. Inner storage @@ -39,5 +40,6 @@ func ProvideConfigs(cfg *config.Config) Configs { S3: cfg.Storage.S3, Log: cfg.Log, Mailer: cfg.Mailer, + Telemetry: cfg.Telemetry, } } diff --git a/internal/fx/logger.go b/internal/fx/logger.go index a3c8d3f..384b3b3 100644 --- a/internal/fx/logger.go +++ b/internal/fx/logger.go @@ -1,21 +1,39 @@ package fx import ( + "github.com/uptrace/opentelemetry-go-extra/otelzap" "go.uber.org/fx" "go.uber.org/zap" "github.com/storacha/sprue/internal/config" ) -// LoggerModule provides the zap logger. +// LoggerModule provides both *zap.Logger and *otelzap.Logger. Code that has +// no request context injects *zap.Logger unchanged; code that has a context +// can inject *otelzap.Logger to get span-status updates and OTel log bridge +// emission via its Ctx / *Context methods. var LoggerModule = fx.Module("logger", - fx.Provide(NewLogger), + fx.Provide(NewZapLogger), + fx.Provide(NewOtelLogger), ) -// NewLogger creates a zap logger based on the configured log level. -func NewLogger(cfg *config.Config) (*zap.Logger, error) { +// NewZapLogger creates a zap logger based on the configured log level. +func NewZapLogger(cfg *config.Config) (*zap.Logger, error) { if cfg.Log.Level == "debug" || cfg.Deployment.Environment == "development" || cfg.Deployment.Environment == "test" { return zap.NewDevelopment() } return zap.NewProduction() } + +// NewOtelLogger wraps the base zap logger with otelzap so call sites with a +// context can use logger.Ctx(ctx) to annotate the active span and emit to +// the OTel log pipeline. Also registers otelzap globals so free-floating +// otelzap.Ctx(ctx) calls work anywhere in the program. +func NewOtelLogger(base *zap.Logger) *otelzap.Logger { + l := otelzap.New(base, + otelzap.WithMinLevel(zap.InfoLevel), + otelzap.WithErrorStatusLevel(zap.ErrorLevel), + ) + otelzap.ReplaceGlobals(l) + return l +} diff --git a/internal/fx/server.go b/internal/fx/server.go index 6214257..48ad69d 100644 --- a/internal/fx/server.go +++ b/internal/fx/server.go @@ -8,6 +8,7 @@ import ( "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" + "go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho" "go.uber.org/fx" "go.uber.org/zap" @@ -31,7 +32,11 @@ func NewEchoServer( e.HideBanner = true e.HidePort = true - // Middleware + // Middleware — otelecho first so every downstream handler and middleware + // runs inside the HTTP request span. The middleware reads the global + // TracerProvider / TextMapPropagator installed by TelemetryModule, so it + // is a no-op until telemetry is configured. + e.Use(otelecho.Middleware("sprue")) e.Use(middleware.Recover()) e.Use(middleware.RequestID()) e.Use(middleware.RequestLogger()) diff --git a/internal/fx/store/postgres/provider.go b/internal/fx/store/postgres/provider.go index 38041ac..30c5f31 100644 --- a/internal/fx/store/postgres/provider.go +++ b/internal/fx/store/postgres/provider.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" + "github.com/exaring/otelpgx" "github.com/jackc/pgx/v5/pgxpool" "github.com/storacha/sprue/internal/config" "github.com/storacha/sprue/internal/migrations" @@ -95,6 +96,12 @@ func NewPostgresPool(cfg config.PostgresConfig, lc fx.Lifecycle, logger *zap.Log if cfg.MinConns > 0 { poolCfg.MinConns = cfg.MinConns } + // Attach otelpgx as a pgx QueryTracer so every Exec/Query/Batch becomes a + // span parented to the caller's context. Reads from the global + // TracerProvider installed by TelemetryModule — no-op until that's wired. + poolCfg.ConnConfig.Tracer = otelpgx.NewTracer( + otelpgx.WithTrimSQLInSpanName(), + ) pool, err := pgxpool.NewWithConfig(context.Background(), poolCfg) if err != nil { diff --git a/internal/fx/telemetry.go b/internal/fx/telemetry.go new file mode 100644 index 0000000..ed87d78 --- /dev/null +++ b/internal/fx/telemetry.go @@ -0,0 +1,247 @@ +package fx + +import ( + "context" + "errors" + "fmt" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/propagation" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.25.0" + "go.uber.org/fx" + "go.uber.org/zap" + + "github.com/storacha/sprue/internal/config" +) + +const ( + telemetryServiceName = "sprue" + telemetryShutdownBudget = 15 * time.Second +) + +// TelemetryModule sets up OpenTelemetry traces and metrics. The module +// installs the global text-map propagator and OTel error handler +// unconditionally; TracerProvider and MeterProvider are installed only when +// their corresponding Endpoint is configured. +var TelemetryModule = fx.Module("telemetry", + fx.Provide(NewTelemetryProvider), + fx.Invoke(RegisterTelemetryLifecycle), +) + +// TelemetryProvider owns the SDK providers installed as OTel globals so they +// can be flushed and shut down on application stop. +type TelemetryProvider struct { + tp *sdktrace.TracerProvider + mp *sdkmetric.MeterProvider +} + +// NewTelemetryProvider builds the OTLP/HTTP exporters and installs them as +// the global TracerProvider and MeterProvider. Either provider may be nil +// when the corresponding endpoint is empty. +func NewTelemetryProvider( + cfg config.TelemetryConfig, + dep config.DeploymentConfig, + logger *zap.Logger, +) (*TelemetryProvider, error) { + // Always set a propagator so inbound traceparent headers are honoured + // once downstream instrumentation is wired up, even if we don't export. + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + )) + + // Route internal OTel SDK errors through zap so exporter failures are + // visible in structured logs rather than landing on stderr. + otel.SetErrorHandler(&zapErrorHandler{logger: logger}) + + res, err := buildResource(dep) + if err != nil { + return nil, fmt.Errorf("building telemetry resource: %w", err) + } + + t := &TelemetryProvider{} + + if cfg.Traces.Endpoint != "" { + tp, err := newTracerProvider(cfg.Traces, res) + if err != nil { + return nil, fmt.Errorf("tracer provider: %w", err) + } + otel.SetTracerProvider(tp) + t.tp = tp + logger.Info("telemetry traces enabled", + zap.String("endpoint", cfg.Traces.Endpoint), + zap.Float64("sample_ratio", cfg.Traces.SampleRatio), + ) + } else { + logger.Info("telemetry traces disabled") + } + + if cfg.Metrics.Endpoint != "" { + mp, err := newMeterProvider(cfg.Metrics, res) + if err != nil { + return nil, fmt.Errorf("meter provider: %w", err) + } + otel.SetMeterProvider(mp) + t.mp = mp + logger.Info("telemetry metrics enabled", + zap.String("endpoint", cfg.Metrics.Endpoint), + zap.Duration("export_interval", cfg.Metrics.ExportInterval), + ) + } else { + logger.Info("telemetry metrics disabled") + } + + return t, nil +} + +// RegisterTelemetryLifecycle hooks provider shutdown into the fx lifecycle. +// The telemetry module is registered right after LoggerModule in AppModule, +// so its OnStop runs after the HTTP server's — giving any last request +// spans a chance to flush before the exporter closes. +func RegisterTelemetryLifecycle(lc fx.Lifecycle, t *TelemetryProvider, logger *zap.Logger) { + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + if t.tp == nil && t.mp == nil { + return nil + } + logger.Info("flushing telemetry providers") + shutdownCtx, cancel := context.WithTimeout(ctx, telemetryShutdownBudget) + defer cancel() + var errs []error + if t.tp != nil { + if err := t.tp.Shutdown(shutdownCtx); err != nil { + errs = append(errs, fmt.Errorf("tracer provider shutdown: %w", err)) + } + } + if t.mp != nil { + if err := t.mp.Shutdown(shutdownCtx); err != nil { + errs = append(errs, fmt.Errorf("meter provider shutdown: %w", err)) + } + } + return errors.Join(errs...) + }, + }) +} + +func buildResource(dep config.DeploymentConfig) (*resource.Resource, error) { + attrs := []attribute.KeyValue{semconv.ServiceName(telemetryServiceName)} + if dep.Environment != "" { + attrs = append(attrs, semconv.DeploymentEnvironment(dep.Environment)) + } + return resource.Merge(resource.Default(), resource.NewSchemaless(attrs...)) +} + +func newTracerProvider(cfg config.TracesConfig, res *resource.Resource) (*sdktrace.TracerProvider, error) { + opts := []otlptracehttp.Option{otlptracehttp.WithEndpoint(cfg.Endpoint)} + if cfg.Insecure { + opts = append(opts, otlptracehttp.WithInsecure()) + } + if len(cfg.Headers) > 0 { + opts = append(opts, otlptracehttp.WithHeaders(cfg.Headers)) + } + if cfg.Timeout > 0 { + opts = append(opts, otlptracehttp.WithTimeout(cfg.Timeout)) + } + if comp, err := parseTraceCompression(cfg.Compression); err != nil { + return nil, err + } else if cfg.Compression != "" { + opts = append(opts, otlptracehttp.WithCompression(comp)) + } + + exp, err := otlptrace.New(context.Background(), otlptracehttp.NewClient(opts...)) + if err != nil { + return nil, fmt.Errorf("creating otlp trace exporter: %w", err) + } + + // Parent-only sampling: a request with no upstream traceparent is not + // traced by default. This silences synthetic traffic like health-check + // probes and forces sampling decisions to flow from instrumented callers. + // SampleRatio lets operators opt in to probabilistic root sampling when + // they want unparented traffic recorded anyway (e.g. in development). + root := sdktrace.NeverSample() + if cfg.SampleRatio > 0 { + root = sdktrace.TraceIDRatioBased(cfg.SampleRatio) + } + + return sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exp), + sdktrace.WithResource(res), + sdktrace.WithSampler(sdktrace.ParentBased(root)), + ), nil +} + +func newMeterProvider(cfg config.MetricsConfig, res *resource.Resource) (*sdkmetric.MeterProvider, error) { + opts := []otlpmetrichttp.Option{otlpmetrichttp.WithEndpoint(cfg.Endpoint)} + if cfg.Insecure { + opts = append(opts, otlpmetrichttp.WithInsecure()) + } + if len(cfg.Headers) > 0 { + opts = append(opts, otlpmetrichttp.WithHeaders(cfg.Headers)) + } + if cfg.Timeout > 0 { + opts = append(opts, otlpmetrichttp.WithTimeout(cfg.Timeout)) + } + if comp, err := parseMetricCompression(cfg.Compression); err != nil { + return nil, err + } else if cfg.Compression != "" { + opts = append(opts, otlpmetrichttp.WithCompression(comp)) + } + + exp, err := otlpmetrichttp.New(context.Background(), opts...) + if err != nil { + return nil, fmt.Errorf("creating otlp metric exporter: %w", err) + } + + readerOpts := []sdkmetric.PeriodicReaderOption{} + if cfg.ExportInterval > 0 { + readerOpts = append(readerOpts, sdkmetric.WithInterval(cfg.ExportInterval)) + } + reader := sdkmetric.NewPeriodicReader(exp, readerOpts...) + + return sdkmetric.NewMeterProvider( + sdkmetric.WithReader(reader), + sdkmetric.WithResource(res), + ), nil +} + +func parseTraceCompression(s string) (otlptracehttp.Compression, error) { + switch s { + case "", "none": + return otlptracehttp.NoCompression, nil + case "gzip": + return otlptracehttp.GzipCompression, nil + default: + return 0, fmt.Errorf("unsupported traces compression %q (valid: none, gzip)", s) + } +} + +func parseMetricCompression(s string) (otlpmetrichttp.Compression, error) { + switch s { + case "", "none": + return otlpmetrichttp.NoCompression, nil + case "gzip": + return otlpmetrichttp.GzipCompression, nil + default: + return 0, fmt.Errorf("unsupported metrics compression %q (valid: none, gzip)", s) + } +} + +// zapErrorHandler adapts otel.ErrorHandler to zap. +type zapErrorHandler struct { + logger *zap.Logger +} + +func (h *zapErrorHandler) Handle(err error) { + if err == nil { + return + } + h.logger.Warn("opentelemetry error", zap.Error(err)) +} diff --git a/pkg/piriclient/client.go b/pkg/piriclient/client.go index a5a963d..636f8fe 100644 --- a/pkg/piriclient/client.go +++ b/pkg/piriclient/client.go @@ -3,6 +3,7 @@ package piriclient import ( "context" "fmt" + "net/http" "net/url" "time" @@ -20,6 +21,12 @@ import ( "github.com/storacha/go-ucanto/did" ucanhttp "github.com/storacha/go-ucanto/transport/http" "github.com/storacha/go-ucanto/ucan" + "github.com/uptrace/opentelemetry-go-extra/otelzap" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" ) @@ -42,13 +49,21 @@ type Client struct { piriDID did.DID signer ucan.Signer connection uclient.Connection - logger *zap.Logger + logger *otelzap.Logger } // New creates a new Piri client. // The delegationFetcher is used to fetch delegation proofs on-demand for each request. func New(endpoint *url.URL, piriDID did.DID, signer ucan.Signer, logger *zap.Logger) (*Client, error) { - channel := ucanhttp.NewChannel(endpoint) + // otelhttp wraps the default transport so every outbound UCAN request + // becomes a child span of whatever span is active on the caller's ctx + // (typically piriclient.Allocate / .Accept / .ReplicaAllocate). This is + // also what propagates the W3C traceparent header downstream so piri + // nodes can continue the trace. + httpClient := &http.Client{ + Transport: otelhttp.NewTransport(http.DefaultTransport), + } + channel := ucanhttp.NewChannel(endpoint, ucanhttp.WithClient(httpClient)) conn, err := uclient.NewConnection(piriDID, channel) if err != nil { return nil, fmt.Errorf("creating connection: %w", err) @@ -61,7 +76,7 @@ func NewWithConnection(piriDID did.DID, signer ucan.Signer, conn uclient.Connect piriDID: piriDID, signer: signer, connection: conn, - logger: logger, + logger: otelzap.New(logger), } } @@ -108,14 +123,40 @@ func (c *Client) fetchDelegationOpts(ctx context.Context, fetcher DelegationFetc // Allocate sends a blob/allocate invocation to the piri node. // Returns the response data, the invocation that was sent, and the receipt from piri. func (c *Client) Allocate(ctx context.Context, req *AllocateRequest, fetcher DelegationFetcher) (*AllocateResponse, invocation.Invocation, receipt.AnyReceipt, error) { - // Fetch delegation fresh for each request + ctx, span := tracer.Start(ctx, "piriclient.Allocate", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("piri.did", c.piriDID.String()), + attribute.String("space.did", req.Space.String()), + attribute.Int64("blob.size", int64(req.Size)), + ), + ) + defer span.End() + + start := time.Now() + outcome := "ok" + defer func() { + labels := metric.WithAttributes(attribute.String("outcome", outcome)) + allocateDuration.Record(ctx, time.Since(start).Seconds(), labels) + allocateRequests.Add(ctx, 1, labels) + }() + + fail := func(err error, reason string) error { + outcome = reason + span.RecordError(err) + span.SetStatus(codes.Error, reason) + c.logger.Ctx(ctx).Error("allocate failed", + zap.String("reason", reason), + zap.Error(err), + ) + return err + } + opts, err := c.fetchDelegationOpts(ctx, fetcher) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, fail(err, "fetch_delegation") } - // Create the invocation - // The resource (With) must be the piri node's DID for blob/allocate inv, err := blobcap.Allocate.Invoke( c.signer, c.piriDID, @@ -131,10 +172,9 @@ func (c *Client) Allocate(ctx context.Context, req *AllocateRequest, fetcher Del opts..., ) if err != nil { - return nil, nil, nil, fmt.Errorf("creating allocate invocation: %w", err) + return nil, nil, nil, fail(fmt.Errorf("creating allocate invocation: %w", err), "invocation_create") } - // Log invocation details proofLinks := inv.Proofs() blockCount := 0 for _, blkErr := range inv.Export() { @@ -149,52 +189,30 @@ func (c *Client) Allocate(ctx context.Context, req *AllocateRequest, fetcher Del zap.Int("proofLinks", len(proofLinks)), zap.Int("blocks", blockCount)) - // Execute the invocation resp, err := uclient.Execute(ctx, []invocation.Invocation{inv}, c.connection) if err != nil { - return nil, nil, nil, fmt.Errorf("executing allocate invocation: %w", err) + return nil, nil, nil, fail(fmt.Errorf("executing allocate invocation: %w", err), "execute") } - // Get the receipt rcptLink, ok := resp.Get(inv.Link()) if !ok { - return nil, nil, nil, fmt.Errorf("receipt not found for invocation") + return nil, nil, nil, fail(fmt.Errorf("receipt not found for invocation"), "receipt_missing") } - // Read the receipt using the any reader to avoid type issues anyReader := receipt.NewAnyReceiptReader(types.Converters...) anyRcpt, err := anyReader.Read(rcptLink, resp.Blocks()) if err != nil { - return nil, nil, nil, fmt.Errorf("reading receipt: %w", err) + return nil, nil, nil, fail(fmt.Errorf("reading receipt: %w", err), "receipt_read") } - // Check for error response okNode, errNode := result.Unwrap(anyRcpt.Out()) if errNode != nil { - // Try to extract error details - var errDetails string - if msgNode, lookupErr := errNode.LookupByString("message"); lookupErr == nil { - if msg, asErr := msgNode.AsString(); asErr == nil { - errDetails = msg - } - } - if errDetails == "" { - if nameNode, lookupErr := errNode.LookupByString("name"); lookupErr == nil { - if name, asErr := nameNode.AsString(); asErr == nil { - errDetails = name - } - } - } - if errDetails == "" { - errDetails = "unknown error" - } - return nil, nil, nil, fmt.Errorf("allocate failed: %s", errDetails) + return nil, nil, nil, fail(fmt.Errorf("allocate failed: %s", extractErrorDetails(errNode)), "receipt_error") } if okNode == nil { - return nil, nil, nil, fmt.Errorf("allocate returned nil result") + return nil, nil, nil, fail(fmt.Errorf("allocate returned nil result"), "receipt_empty") } - // Rebind to the typed receipt typedRcpt, err := receipt.Rebind[blobcap.AllocateOk, fdm.FailureModel]( anyRcpt, blobcap.AllocateOkType(), @@ -202,15 +220,15 @@ func (c *Client) Allocate(ctx context.Context, req *AllocateRequest, fetcher Del types.Converters..., ) if err != nil { - return nil, nil, nil, fmt.Errorf("rebinding receipt: %w", err) + return nil, nil, nil, fail(fmt.Errorf("rebinding receipt: %w", err), "receipt_rebind") } - // Extract the result allocateOk, failErr := result.Unwrap(typedRcpt.Out()) if (failErr != fdm.FailureModel{}) { - return nil, nil, nil, fmt.Errorf("allocate failed: %s", failErr.Message) + return nil, nil, nil, fail(fmt.Errorf("allocate failed: %s", failErr.Message), "result_failure") } + span.SetAttributes(attribute.Int64("blob.allocated_size", int64(allocateOk.Size))) return &AllocateResponse{ Size: allocateOk.Size, Address: allocateOk.Address, @@ -260,10 +278,38 @@ type AcceptResponse struct { // Accept sends a blob/accept invocation to the piri node. func (c *Client) Accept(ctx context.Context, req *AcceptRequest, fetcher DelegationFetcher) (*AcceptResponse, invocation.Invocation, receipt.AnyReceipt, error) { - // Fetch delegation fresh for each request + ctx, span := tracer.Start(ctx, "piriclient.Accept", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("piri.did", c.piriDID.String()), + attribute.String("space.did", req.Space.String()), + attribute.Int64("blob.size", int64(req.Size)), + ), + ) + defer span.End() + + start := time.Now() + outcome := "ok" + defer func() { + labels := metric.WithAttributes(attribute.String("outcome", outcome)) + acceptDuration.Record(ctx, time.Since(start).Seconds(), labels) + acceptRequests.Add(ctx, 1, labels) + }() + + fail := func(err error, reason string) error { + outcome = reason + span.RecordError(err) + span.SetStatus(codes.Error, reason) + c.logger.Ctx(ctx).Error("accept failed", + zap.String("reason", reason), + zap.Error(err), + ) + return err + } + opts, err := c.fetchDelegationOpts(ctx, fetcher) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, fail(err, "fetch_delegation") } // Use WithNoExpiration so the invocation CID is deterministic and matches @@ -289,10 +335,9 @@ func (c *Client) Accept(ctx context.Context, req *AcceptRequest, fetcher Delegat opts..., ) if err != nil { - return nil, nil, nil, fmt.Errorf("creating accept invocation: %w", err) + return nil, nil, nil, fail(fmt.Errorf("creating accept invocation: %w", err), "invocation_create") } - // Log invocation details acceptProofLinks := inv.Proofs() acceptBlockCount := 0 for _, blkErr := range inv.Export() { @@ -307,48 +352,28 @@ func (c *Client) Accept(ctx context.Context, req *AcceptRequest, fetcher Delegat zap.Int("proofLinks", len(acceptProofLinks)), zap.Int("blocks", acceptBlockCount)) - // Execute the invocation resp, err := uclient.Execute(ctx, []invocation.Invocation{inv}, c.connection) if err != nil { - return nil, nil, nil, fmt.Errorf("executing accept invocation: %w", err) + return nil, nil, nil, fail(fmt.Errorf("executing accept invocation: %w", err), "execute") } - // Get the receipt rcptLink, ok := resp.Get(inv.Link()) if !ok { - return nil, nil, nil, fmt.Errorf("receipt not found for invocation") + return nil, nil, nil, fail(fmt.Errorf("receipt not found for invocation"), "receipt_missing") } - // Read the receipt using the any reader anyReader := receipt.NewAnyReceiptReader(types.Converters...) anyRcpt, err := anyReader.Read(rcptLink, resp.Blocks()) if err != nil { - return nil, nil, nil, fmt.Errorf("reading receipt: %w", err) + return nil, nil, nil, fail(fmt.Errorf("reading receipt: %w", err), "receipt_read") } - // Check for error response okNode, errNode := result.Unwrap(anyRcpt.Out()) if errNode != nil { - var errDetails string - if msgNode, lookupErr := errNode.LookupByString("message"); lookupErr == nil { - if msg, asErr := msgNode.AsString(); asErr == nil { - errDetails = msg - } - } - if errDetails == "" { - if nameNode, lookupErr := errNode.LookupByString("name"); lookupErr == nil { - if name, asErr := nameNode.AsString(); asErr == nil { - errDetails = name - } - } - } - if errDetails == "" { - errDetails = "unknown error" - } - return nil, nil, nil, fmt.Errorf("accept failed: %s", errDetails) + return nil, nil, nil, fail(fmt.Errorf("accept failed: %s", extractErrorDetails(errNode)), "receipt_error") } if okNode == nil { - return nil, nil, nil, fmt.Errorf("accept returned nil result") + return nil, nil, nil, fail(fmt.Errorf("accept returned nil result"), "receipt_empty") } // Extract the site link from the ok node @@ -419,9 +444,39 @@ type ReplicaAllocateResponse struct { // Returns the response data, the invocation that was sent, and the receipt from // piri. It returns an error if the receipt contains a failure result. func (c *Client) ReplicaAllocate(ctx context.Context, req *ReplicaAllocateRequest, fetcher DelegationFetcher) (*ReplicaAllocateResponse, invocation.Invocation, receipt.AnyReceipt, error) { + ctx, span := tracer.Start(ctx, "piriclient.ReplicaAllocate", + trace.WithSpanKind(trace.SpanKindClient), + trace.WithAttributes( + attribute.String("piri.did", c.piriDID.String()), + attribute.String("space.did", req.Space.String()), + attribute.Int64("blob.size", int64(req.Size)), + attribute.String("site.cid", req.Site.Link().String()), + ), + ) + defer span.End() + + start := time.Now() + outcome := "ok" + defer func() { + labels := metric.WithAttributes(attribute.String("outcome", outcome)) + replicaAllocateDuration.Record(ctx, time.Since(start).Seconds(), labels) + replicaAllocateRequests.Add(ctx, 1, labels) + }() + + fail := func(err error, reason string) error { + outcome = reason + span.RecordError(err) + span.SetStatus(codes.Error, reason) + c.logger.Ctx(ctx).Error("replica allocate failed", + zap.String("reason", reason), + zap.Error(err), + ) + return err + } + opts, err := c.fetchDelegationOpts(ctx, fetcher) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, fail(err, "fetch_delegation") } // We set a reasonably large expiration as replication nodes use the @@ -446,16 +501,16 @@ func (c *Client) ReplicaAllocate(ctx context.Context, req *ReplicaAllocateReques opts..., ) if err != nil { - return nil, nil, nil, fmt.Errorf("creating replica allocate invocation: %w", err) + return nil, nil, nil, fail(fmt.Errorf("creating replica allocate invocation: %w", err), "invocation_create") } // attach the location commitment to the allocation invocation for b, err := range req.Site.Blocks() { if err != nil { - return nil, nil, nil, fmt.Errorf("iterating location commitment blocks: %w", err) + return nil, nil, nil, fail(fmt.Errorf("iterating location commitment blocks: %w", err), "site_iterate") } if err := inv.Attach(b); err != nil { - return nil, nil, nil, fmt.Errorf("attaching location commitment block: %w", err) + return nil, nil, nil, fail(fmt.Errorf("attaching location commitment block: %w", err), "site_attach") } } @@ -466,43 +521,44 @@ func (c *Client) ReplicaAllocate(ctx context.Context, req *ReplicaAllocateReques resp, err := uclient.Execute(ctx, []invocation.Invocation{inv}, c.connection) if err != nil { - return nil, nil, nil, fmt.Errorf("executing replica allocate invocation: %w", err) + return nil, nil, nil, fail(fmt.Errorf("executing replica allocate invocation: %w", err), "execute") } rcptLink, ok := resp.Get(inv.Link()) if !ok { - return nil, nil, nil, fmt.Errorf("receipt not found for invocation") + return nil, nil, nil, fail(fmt.Errorf("receipt not found for invocation"), "receipt_missing") } reader := receipt.NewAnyReceiptReader(types.Converters...) rcpt, err := reader.Read(rcptLink, resp.Blocks()) if err != nil { - return nil, nil, nil, fmt.Errorf("reading receipt: %w", err) + return nil, nil, nil, fail(fmt.Errorf("reading receipt: %w", err), "receipt_read") } o, x := result.Unwrap(rcpt.Out()) if x != nil { - return nil, nil, nil, fmt.Errorf("allocate failed: %s", fdm.Bind(x).Message) + return nil, nil, nil, fail(fmt.Errorf("allocate failed: %s", fdm.Bind(x).Message), "receipt_error") } allocateOk, err := ipld.Rebind[blobreplicacap.AllocateOk](o, blobreplicacap.AllocateOkType(), types.Converters...) if err != nil { - return nil, nil, nil, fmt.Errorf("rebinding receipt: %w", err) + return nil, nil, nil, fail(fmt.Errorf("rebinding receipt: %w", err), "receipt_rebind") } if allocateOk.Site.UcanAwait.Selector != blobreplicacap.AllocateSiteSelector { - return nil, nil, nil, fmt.Errorf("unexpected site selector: %s", allocateOk.Site.UcanAwait.Selector) + return nil, nil, nil, fail(fmt.Errorf("unexpected site selector: %s", allocateOk.Site.UcanAwait.Selector), "site_selector") } br, err := blockstore.NewBlockReader(blockstore.WithBlocksIterator(rcpt.Blocks())) if err != nil { - return nil, nil, nil, fmt.Errorf("creating block reader: %w", err) + return nil, nil, nil, fail(fmt.Errorf("creating block reader: %w", err), "block_reader") } transfer, err := invocation.NewInvocationView(allocateOk.Site.UcanAwait.Link, br) if err != nil { - return nil, nil, nil, fmt.Errorf("creating transfer invocation view: %w", err) + return nil, nil, nil, fail(fmt.Errorf("creating transfer invocation view: %w", err), "transfer_view") } + span.SetAttributes(attribute.Int64("blob.allocated_size", int64(allocateOk.Size))) return &ReplicaAllocateResponse{ Size: allocateOk.Size, Site: allocateOk.Site, diff --git a/pkg/piriclient/telemetry.go b/pkg/piriclient/telemetry.go new file mode 100644 index 0000000..861891a --- /dev/null +++ b/pkg/piriclient/telemetry.go @@ -0,0 +1,80 @@ +package piriclient + +import ( + "github.com/ipld/go-ipld-prime/datamodel" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" +) + +const instrumentationName = "github.com/storacha/sprue/pkg/piriclient" + +var ( + tracer = otel.Tracer(instrumentationName) + meter = otel.Meter(instrumentationName) + + allocateDuration metric.Float64Histogram + allocateRequests metric.Int64Counter + acceptDuration metric.Float64Histogram + acceptRequests metric.Int64Counter + replicaAllocateDuration metric.Float64Histogram + replicaAllocateRequests metric.Int64Counter +) + +func init() { + var err error + if allocateDuration, err = meter.Float64Histogram( + "piriclient.allocate.duration", + metric.WithUnit("s"), + metric.WithDescription("Duration of blob/allocate invocations to a Piri node."), + ); err != nil { + otel.Handle(err) + } + if allocateRequests, err = meter.Int64Counter( + "piriclient.allocate.requests", + metric.WithDescription("Count of blob/allocate invocations, labeled by outcome."), + ); err != nil { + otel.Handle(err) + } + if acceptDuration, err = meter.Float64Histogram( + "piriclient.accept.duration", + metric.WithUnit("s"), + metric.WithDescription("Duration of blob/accept invocations to a Piri node."), + ); err != nil { + otel.Handle(err) + } + if acceptRequests, err = meter.Int64Counter( + "piriclient.accept.requests", + metric.WithDescription("Count of blob/accept invocations, labeled by outcome."), + ); err != nil { + otel.Handle(err) + } + if replicaAllocateDuration, err = meter.Float64Histogram( + "piriclient.replica_allocate.duration", + metric.WithUnit("s"), + metric.WithDescription("Duration of blob/replica/allocate invocations to a Piri node."), + ); err != nil { + otel.Handle(err) + } + if replicaAllocateRequests, err = meter.Int64Counter( + "piriclient.replica_allocate.requests", + metric.WithDescription("Count of blob/replica/allocate invocations, labeled by outcome."), + ); err != nil { + otel.Handle(err) + } +} + +// extractErrorDetails pulls a human-readable message out of a receipt error +// node, falling back through "message" → "name" → a static sentinel. +func extractErrorDetails(errNode datamodel.Node) string { + if msgNode, lookupErr := errNode.LookupByString("message"); lookupErr == nil { + if msg, asErr := msgNode.AsString(); asErr == nil && msg != "" { + return msg + } + } + if nameNode, lookupErr := errNode.LookupByString("name"); lookupErr == nil { + if name, asErr := nameNode.AsString(); asErr == nil && name != "" { + return name + } + } + return "unknown error" +} diff --git a/pkg/service/handlers/access_authorize.go b/pkg/service/handlers/access_authorize.go index 81be8e3..917d582 100644 --- a/pkg/service/handlers/access_authorize.go +++ b/pkg/service/handlers/access_authorize.go @@ -45,8 +45,9 @@ var ( func WithAccessAuthorizeMethod(serverCfg config.ServerConfig, id *identity.Identity, mailer mailer.Mailer, logger *zap.Logger) server.Option { return server.WithServiceMethod( access.AuthorizeAbility, - server.Provide( + ProvideTraced( access.Authorize, + access.AuthorizeAbility, AccessAuthorizeHandler(serverCfg, id, mailer, logger), ), ) diff --git a/pkg/service/handlers/access_claim.go b/pkg/service/handlers/access_claim.go index a7db70f..889498d 100644 --- a/pkg/service/handlers/access_claim.go +++ b/pkg/service/handlers/access_claim.go @@ -35,8 +35,9 @@ var ErrInvalidClaimAudience = errors.New(InvalidClaimAudienceErrorName, "invalid func WithAccessClaimMethod(id *identity.Identity, delegationStore delegation_store.Store, logger *zap.Logger) server.Option { return server.WithServiceMethod( access.ClaimAbility, - server.Provide( + ProvideTraced( access.Claim, + access.ClaimAbility, AccessClaimHandler(id, delegationStore, logger), ), ) diff --git a/pkg/service/handlers/access_confirm.go b/pkg/service/handlers/access_confirm.go index 53f58fe..4b781c7 100644 --- a/pkg/service/handlers/access_confirm.go +++ b/pkg/service/handlers/access_confirm.go @@ -32,8 +32,9 @@ const InvalidAccessConfirmDelegationErrorName = "InvalidAccessConfirmDelegation" func WithAccessConfirmMethod(id *identity.Identity, delegationStore delegation_store.Store, logger *zap.Logger) server.Option { return server.WithServiceMethod( access.ConfirmAbility, - server.Provide( + ProvideTraced( access.Confirm, + access.ConfirmAbility, AccessConfirmHandler(id, delegationStore, logger), ), ) diff --git a/pkg/service/handlers/access_delegate.go b/pkg/service/handlers/access_delegate.go index 9eb0f09..f999478 100644 --- a/pkg/service/handlers/access_delegate.go +++ b/pkg/service/handlers/access_delegate.go @@ -32,8 +32,9 @@ const ( func WithAccessDelegateMethod(delegationStore delegation_store.Store, provisioningSvc *provisioning.Service, logger *zap.Logger) server.Option { return server.WithServiceMethod( access.DelegateAbility, - server.Provide( + ProvideTraced( access.Delegate, + access.DelegateAbility, AccessDelegateHandler(delegationStore, provisioningSvc, logger), ), ) diff --git a/pkg/service/handlers/admin_provider_deregister.go b/pkg/service/handlers/admin_provider_deregister.go index bee8cb4..1bdbb1d 100644 --- a/pkg/service/handlers/admin_provider_deregister.go +++ b/pkg/service/handlers/admin_provider_deregister.go @@ -21,8 +21,9 @@ import ( func WithAdminProviderDeregisterMethod(id *identity.Identity, providerStore storageprovider.Store, logger *zap.Logger) server.Option { return server.WithServiceMethod( provider.DeregisterAbility, - server.Provide( + ProvideTraced( provider.Deregister, + provider.DeregisterAbility, AdminProviderDeregisterHandler(id, providerStore, logger), ), ) diff --git a/pkg/service/handlers/admin_provider_list.go b/pkg/service/handlers/admin_provider_list.go index 5534a5a..bff7c34 100644 --- a/pkg/service/handlers/admin_provider_list.go +++ b/pkg/service/handlers/admin_provider_list.go @@ -22,8 +22,9 @@ import ( func WithAdminProviderListMethod(id *identity.Identity, providerStore storageprovider.Store, logger *zap.Logger) server.Option { return server.WithServiceMethod( provider.ListAbility, - server.Provide( + ProvideTraced( provider.List, + provider.ListAbility, AdminProviderListHandler(id, providerStore, logger), ), ) diff --git a/pkg/service/handlers/admin_provider_register.go b/pkg/service/handlers/admin_provider_register.go index e3d4536..c0e127b 100644 --- a/pkg/service/handlers/admin_provider_register.go +++ b/pkg/service/handlers/admin_provider_register.go @@ -30,8 +30,9 @@ var ( func WithAdminProviderRegisterMethod(id *identity.Identity, providerStore storageprovider.Store, logger *zap.Logger) server.Option { return server.WithServiceMethod( provider.RegisterAbility, - server.Provide( + ProvideTraced( provider.Register, + provider.RegisterAbility, AdminProviderRegisterHandler(id, providerStore, logger), ), ) diff --git a/pkg/service/handlers/admin_provider_weight_set.go b/pkg/service/handlers/admin_provider_weight_set.go index baa7373..12831f0 100644 --- a/pkg/service/handlers/admin_provider_weight_set.go +++ b/pkg/service/handlers/admin_provider_weight_set.go @@ -21,8 +21,9 @@ import ( func WithAdminProviderWeightSetMethod(id *identity.Identity, providerStore storageprovider.Store, logger *zap.Logger) server.Option { return server.WithServiceMethod( provider.WeightSetAbility, - server.Provide( + ProvideTraced( provider.WeightSet, + provider.WeightSetAbility, AdminProviderWeightSetHandler(id, providerStore, logger), ), ) diff --git a/pkg/service/handlers/filecoin_offer.go b/pkg/service/handlers/filecoin_offer.go index 1b6a3d8..dd2b8e0 100644 --- a/pkg/service/handlers/filecoin_offer.go +++ b/pkg/service/handlers/filecoin_offer.go @@ -20,8 +20,9 @@ import ( func WithFilecoinOfferMethod(logger *zap.Logger) server.Option { return server.WithServiceMethod( filecoincap.OfferAbility, - server.Provide( + ProvideTraced( filecoincap.Offer, + filecoincap.OfferAbility, func(ctx context.Context, cap ucan.Capability[filecoincap.OfferCaveats], inv invocation.Invocation, diff --git a/pkg/service/handlers/provider_add.go b/pkg/service/handlers/provider_add.go index 04bb46a..3931339 100644 --- a/pkg/service/handlers/provider_add.go +++ b/pkg/service/handlers/provider_add.go @@ -34,8 +34,9 @@ const ( func WithProviderAddMethod(deploymentCfg config.DeploymentConfig, provisioningSvc *provisioning.Service, billingSvc *billing.Service, logger *zap.Logger) server.Option { return server.WithServiceMethod( provider.AddAbility, - server.Provide( + ProvideTraced( provider.Add, + provider.AddAbility, ProviderAddHandler(deploymentCfg, provisioningSvc, billingSvc, logger), ), ) diff --git a/pkg/service/handlers/space_blob_add.go b/pkg/service/handlers/space_blob_add.go index 41adb65..b3675ae 100644 --- a/pkg/service/handlers/space_blob_add.go +++ b/pkg/service/handlers/space_blob_add.go @@ -45,8 +45,9 @@ import ( func WithSpaceBlobAddMethod(id *identity.Identity, router *routing.Service, nodeProvider piriclient.Provider, agentStore agent.Store, blobRegistry blobregistry.Store, logger *zap.Logger) server.Option { return server.WithServiceMethod( spaceblobcap.AddAbility, - server.Provide( + ProvideTraced( spaceblobcap.Add, + spaceblobcap.AddAbility, SpaceBlobAddHandler(id, router, nodeProvider, agentStore, blobRegistry, logger), ), ) diff --git a/pkg/service/handlers/space_blob_list.go b/pkg/service/handlers/space_blob_list.go index ad85c89..272850e 100644 --- a/pkg/service/handlers/space_blob_list.go +++ b/pkg/service/handlers/space_blob_list.go @@ -23,7 +23,7 @@ import ( func WithSpaceBlobListMethod(blobRegistry blobregistry.Store, logger *zap.Logger) server.Option { return server.WithServiceMethod( blob.ListAbility, - server.Provide(blob.List, SpaceBlobListHandler(blobRegistry, logger)), + ProvideTraced(blob.List, blob.ListAbility, SpaceBlobListHandler(blobRegistry, logger)), ) } diff --git a/pkg/service/handlers/space_blob_replicate.go b/pkg/service/handlers/space_blob_replicate.go index 398365f..7be2ee9 100644 --- a/pkg/service/handlers/space_blob_replicate.go +++ b/pkg/service/handlers/space_blob_replicate.go @@ -69,8 +69,9 @@ func WithSpaceBlobReplicateMethod( ) server.Option { return server.WithServiceMethod( spaceblobcap.ReplicateAbility, - server.Provide( + ProvideTraced( spaceblobcap.Replicate, + spaceblobcap.ReplicateAbility, SpaceBlobReplicateHandler(cfg, id, router, blobRegistry, replicaStore, agentStore, storageNode, logger), ), ) diff --git a/pkg/service/handlers/space_index_add.go b/pkg/service/handlers/space_index_add.go index 7955913..4712730 100644 --- a/pkg/service/handlers/space_index_add.go +++ b/pkg/service/handlers/space_index_add.go @@ -65,8 +65,9 @@ func extractRetrievalAuth(inv invocation.Invocation) (delegation.Delegation, err func WithSpaceIndexAddMethod(provisioningSvc *provisioning.Service, blobRegistry blobregistry.Store, indexerClient *indexerclient.Client, logger *zap.Logger) server.Option { return server.WithServiceMethod( spaceindexcap.AddAbility, - server.Provide( + ProvideTraced( spaceindexcap.Add, + spaceindexcap.AddAbility, SpaceIndexAddHandler(provisioningSvc, blobRegistry, indexerClient, logger), ), ) diff --git a/pkg/service/handlers/trace.go b/pkg/service/handlers/trace.go new file mode 100644 index 0000000..652bde2 --- /dev/null +++ b/pkg/service/handlers/trace.go @@ -0,0 +1,70 @@ +package handlers + +import ( + "context" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + "github.com/storacha/go-ucanto/core/invocation" + "github.com/storacha/go-ucanto/core/ipld" + "github.com/storacha/go-ucanto/core/receipt/fx" + "github.com/storacha/go-ucanto/core/result" + "github.com/storacha/go-ucanto/core/result/failure" + "github.com/storacha/go-ucanto/server" + "github.com/storacha/go-ucanto/ucan" + "github.com/storacha/go-ucanto/validator" +) + +const handlerInstrumentationName = "github.com/storacha/sprue/pkg/service/handlers" + +var handlerTracer = otel.Tracer(handlerInstrumentationName) + +// Traced wraps a server.HandlerFunc so that each invocation opens a child +// span named "ucan." carrying the UCAN resource, issuer, audience, +// and invocation CID as attributes. The span records the transport-level +// error returned by the handler; domain-level failures (encoded in the +// result.Result failure branch) are left for the caller to interpret. +func Traced[C any, O ipld.Builder, X failure.IPLDBuilderFailure]( + ability string, + handler server.HandlerFunc[C, O, X], +) server.HandlerFunc[C, O, X] { + return func( + ctx context.Context, + cap ucan.Capability[C], + inv invocation.Invocation, + ictx server.InvocationContext, + ) (result.Result[O, X], fx.Effects, error) { + ctx, span := handlerTracer.Start(ctx, "ucan."+ability, + trace.WithSpanKind(trace.SpanKindServer), + trace.WithAttributes( + attribute.String("ucan.ability", ability), + attribute.String("ucan.resource", cap.With()), + attribute.String("ucan.issuer", inv.Issuer().DID().String()), + attribute.String("ucan.audience", inv.Audience().DID().String()), + attribute.String("invocation.cid", inv.Link().String()), + ), + ) + defer span.End() + + res, effects, err := handler(ctx, cap, inv, ictx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "handler error") + } + return res, effects, err + } +} + +// ProvideTraced is a drop-in replacement for server.Provide that wraps the +// handler in Traced. The ability string is passed explicitly so the span +// name is built without reflecting over the CapabilityParser. +func ProvideTraced[C any, O ipld.Builder, X failure.IPLDBuilderFailure]( + capability validator.CapabilityParser[C], + ability string, + handler server.HandlerFunc[C, O, X], +) server.ServiceMethod[O, failure.IPLDBuilderFailure] { + return server.Provide(capability, Traced(ability, handler)) +} diff --git a/pkg/service/handlers/ucan_conclude.go b/pkg/service/handlers/ucan_conclude.go index 439c1b1..8e573a0 100644 --- a/pkg/service/handlers/ucan_conclude.go +++ b/pkg/service/handlers/ucan_conclude.go @@ -44,8 +44,9 @@ type ConclusionHandler struct { func WithUCANConcludeMethod(id *identity.Identity, agentStore agent.Store, handlers map[ucan.Ability]ConclusionHandlerFunc, logger *zap.Logger) server.Option { return server.WithServiceMethod( ucancap.ConcludeAbility, - server.Provide( + ProvideTraced( ucancap.Conclude, + ucancap.ConcludeAbility, UCANConcludeHandler(id, agentStore, handlers, logger), ), ) diff --git a/pkg/service/handlers/upload_add.go b/pkg/service/handlers/upload_add.go index af82f1e..add85be 100644 --- a/pkg/service/handlers/upload_add.go +++ b/pkg/service/handlers/upload_add.go @@ -26,7 +26,7 @@ const InvalidSpaceErrorName = "InvalidSpace" func WithUploadAddMethod(uploadStore upload_store.Store, logger *zap.Logger) server.Option { return server.WithServiceMethod( upload.AddAbility, - server.Provide(upload.Add, UploadAddHandler(uploadStore, logger)), + ProvideTraced(upload.Add, upload.AddAbility, UploadAddHandler(uploadStore, logger)), ) } diff --git a/pkg/service/handlers/upload_list.go b/pkg/service/handlers/upload_list.go index 43bfcdf..40995bb 100644 --- a/pkg/service/handlers/upload_list.go +++ b/pkg/service/handlers/upload_list.go @@ -23,7 +23,7 @@ import ( func WithUploadListMethod(uploadStore upload_store.Store, logger *zap.Logger) server.Option { return server.WithServiceMethod( upload.AddAbility, - server.Provide(upload.Add, UploadAddHandler(uploadStore, logger)), + ProvideTraced(upload.Add, upload.AddAbility, UploadAddHandler(uploadStore, logger)), ) } diff --git a/pkg/service/handlers/upload_shard_list.go b/pkg/service/handlers/upload_shard_list.go index 6d7795f..e83db04 100644 --- a/pkg/service/handlers/upload_shard_list.go +++ b/pkg/service/handlers/upload_shard_list.go @@ -25,7 +25,7 @@ import ( func WithUploadShardListMethod(uploadStore upload_store.Store, logger *zap.Logger) server.Option { return server.WithServiceMethod( shard.ListAbility, - server.Provide(shard.List, UploadShardListHandler(uploadStore, logger)), + ProvideTraced(shard.List, shard.ListAbility, UploadShardListHandler(uploadStore, logger)), ) } diff --git a/pkg/service/service.go b/pkg/service/service.go index 591c18b..55aefc8 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -23,6 +23,10 @@ import ( ucanhttp "github.com/storacha/go-ucanto/transport/http" "github.com/storacha/go-ucanto/ucan" "github.com/storacha/go-ucanto/validator" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/storacha/sprue/pkg/identity" @@ -35,6 +39,10 @@ import ( delegation_store "github.com/storacha/sprue/pkg/store/delegation" ) +const instrumentationName = "github.com/storacha/sprue/pkg/service" + +var tracer = otel.Tracer(instrumentationName) + // Service implements the sprue upload service logic. type Service struct { identity *identity.Identity @@ -86,31 +94,51 @@ func (s *Service) createUCANServer() (server.ServerView[server.Service], error) func (s *Service) HandleUCANRequest(c echo.Context) error { r := c.Request() + ctx, span := tracer.Start(r.Context(), "ucan.dispatch", + trace.WithSpanKind(trace.SpanKindServer), + ) + defer span.End() + inBytes, inMsg, inIdx, err := decodeAndIndex(r.Body) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "decode") return fmt.Errorf("decoding and indexing incoming agent message: %w", err) } r.Body.Close() - err = s.agentStore.Write(r.Context(), inMsg, inIdx, inBytes) - if err != nil { + span.SetAttributes( + attribute.Int("ucan.invocations", len(inIdx)), + attribute.Int("ucan.request_bytes", len(inBytes)), + ) + + if err := s.agentStore.Write(ctx, inMsg, inIdx, inBytes); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "agent_store_write_in") return fmt.Errorf("writing incoming agent message to agent store: %w", err) } - res, err := s.ucanServer.Request(r.Context(), ucanhttp.NewRequest(bytes.NewReader(inBytes), r.Header)) + res, err := s.ucanServer.Request(ctx, ucanhttp.NewRequest(bytes.NewReader(inBytes), r.Header)) if err != nil { s.logger.Error("UCAN request error", zap.Error(err)) + span.RecordError(err) + span.SetStatus(codes.Error, "ucan_request") return fmt.Errorf("handling UCAN request: %w", err) } outBytes, outMsg, outIdx, err := decodeAndIndex(res.Body()) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "decode_response") return fmt.Errorf("decoding and indexing outgoing agent message: %w", err) } res.Body().Close() - err = s.agentStore.Write(r.Context(), outMsg, outIdx, outBytes) - if err != nil { + span.SetAttributes(attribute.Int("ucan.response_bytes", len(outBytes))) + + if err := s.agentStore.Write(ctx, outMsg, outIdx, outBytes); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "agent_store_write_out") return fmt.Errorf("writing outgoing agent message to agent store: %w", err) }