Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Following is the supported API format for kafka encode:
readTimeout: timeout (in seconds) for read operation performed by the Writer
batchBytes: limit the maximum size of a request in bytes before being sent to a partition
batchSize: limit on how many messages will be buffered before being sent to a partition
compression: compression codec: none (default), gzip, snappy, lz4, zstd
tls: TLS client configuration (optional)
insecureSkipVerify: skip client verifying the server's certificate chain and host name
caCertPath: path to the CA certificate
Expand Down
10 changes: 6 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/client_model v0.6.2
github.com/prometheus/common v0.67.5
github.com/prometheus/common v0.67.6-0.20260224092343-e4c38a0aea47
github.com/segmentio/kafka-go v0.4.50
github.com/sirupsen/logrus v1.9.4
github.com/spf13/cobra v1.10.2
Expand Down Expand Up @@ -49,7 +49,7 @@ require (
)

require (
github.com/golang-jwt/jwt/v5 v5.3.0 // indirect
github.com/golang-jwt/jwt/v5 v5.3.1 // indirect
github.com/klauspost/crc32 v1.3.0 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
Expand Down Expand Up @@ -93,7 +93,7 @@ require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/klauspost/compress v1.18.2 // indirect
github.com/klauspost/compress v1.18.5 // indirect
github.com/klauspost/cpuid/v2 v2.2.11 // indirect
github.com/libp2p/go-reuseport v0.4.0 // indirect
github.com/mailru/easyjson v0.9.0 // indirect
Expand All @@ -114,7 +114,7 @@ require (
github.com/pion/logging v0.2.3 // indirect
github.com/pion/transport/v2 v2.2.10 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/prometheus/procfs v0.20.1 // indirect
github.com/prometheus/prometheus v0.304.0 // indirect
github.com/rs/xid v1.6.0 // indirect
github.com/sagikazarmark/locafero v0.11.0 // indirect
Expand Down Expand Up @@ -157,3 +157,5 @@ require (
)

replace github.com/vmware/go-ipfix => github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101

replace github.com/prometheus/client_golang => github.com/jpinsonneau/client_golang v0.0.0-20260414103245-2444fefa0bfd
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9L
github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo=
github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY=
github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
Expand Down Expand Up @@ -152,12 +152,14 @@ github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101 h1:tpaHjydMAy2MTukK
github.com/jotak/go-ipfix v0.0.0-20250708115123-407c539ea101/go.mod h1:GgFbcmEGqMQfA7jDC9UVLKAelNh2sy1jsxyV7Tor3Ig=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/jpinsonneau/client_golang v0.0.0-20260414103245-2444fefa0bfd h1:HVClie6aU/oB5rIxok6DLs3f1kY8ilHwR4ch7tEATjw=
github.com/jpinsonneau/client_golang v0.0.0-20260414103245-2444fefa0bfd/go.mod h1:9OehNLjrozMrOntYafW9bVteeRJIVCvxRLSAb3t08h8=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk=
github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE=
github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU=
github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
Expand Down Expand Up @@ -245,14 +247,12 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o=
github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg=
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
github.com/prometheus/common v0.67.5 h1:pIgK94WWlQt1WLwAC5j2ynLaBRDiinoAb86HZHTUGI4=
github.com/prometheus/common v0.67.5/go.mod h1:SjE/0MzDEEAyrdr5Gqc6G+sXI67maCxzaT3A2+HqjUw=
github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
github.com/prometheus/common v0.67.6-0.20260224092343-e4c38a0aea47 h1:DXz2mpU9UOivlNi73hwwVm7KSYGImaPDdYsniI6uOdE=
github.com/prometheus/common v0.67.6-0.20260224092343-e4c38a0aea47/go.mod h1:6wuJRcHcgBM1w2vIgTuCW7XSzlfCat/lS3C39U02oDA=
github.com/prometheus/procfs v0.20.1 h1:XwbrGOIplXW/AU3YhIhLODXMJYyC1isLFfYCsTEycfc=
github.com/prometheus/procfs v0.20.1/go.mod h1:o9EMBZGRyvDrSPH1RqdxhojkuXstoe4UlK79eF5TGGo=
github.com/prometheus/prometheus v0.304.0 h1:otXBqfF7bbTcW7IrXrB6HMjo4dThQbayCPFr2yTlqrQ=
github.com/prometheus/prometheus v0.304.0/go.mod h1:ioGx2SGKTY+fLnJSQCdTHqARVldGNS8OlIe3kvp98so=
github.com/prometheus/sigv4 v0.1.2 h1:R7570f8AoM5YnTUPFm3mjZH5q2k4D+I/phCWvZ4PXG8=
Expand Down
29 changes: 9 additions & 20 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,44 +97,31 @@ func (e *Prometheus) ProcessAggHist(m interface{}, _ string, labels map[string]s
return nil
}

func (e *Prometheus) GetCacheEntry(entryLabels map[string]string, m interface{}) interface{} {
// In prom_encode, the metrics cache just contains cleanup callbacks
switch mv := m.(type) {
case *prometheus.CounterVec:
return func() { mv.Delete(entryLabels) }
case *prometheus.GaugeVec:
return func() { mv.Delete(entryLabels) }
case *prometheus.HistogramVec:
return func() { mv.Delete(entryLabels) }
}
func (e *Prometheus) GetCacheEntry(_ map[string]string, _ interface{}) interface{} {
// With Vec-native TTL, no cache entry is needed for Prometheus
return nil
}

// callback function from lru cleanup
func (e *Prometheus) Cleanup(cleanupFunc interface{}) {
cleanupFunc.(func())()
}

func (e *Prometheus) addCounter(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector {
counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: mInfo.Help}, mInfo.TargetLabels())
counter := prometheus.NewCounterVec(prometheus.CounterOpts{Name: fullMetricName, Help: mInfo.Help, TTL: e.metricCommon.expiryTime}, mInfo.TargetLabels())
e.metricCommon.AddCounter(fullMetricName, counter, mInfo)
return counter
}

func (e *Prometheus) addGauge(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector {
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: mInfo.Help}, mInfo.TargetLabels())
gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: fullMetricName, Help: mInfo.Help, TTL: e.metricCommon.expiryTime}, mInfo.TargetLabels())
e.metricCommon.AddGauge(fullMetricName, gauge, mInfo)
return gauge
}

func (e *Prometheus) addHistogram(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector {
histogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: mInfo.Help}, mInfo.TargetLabels())
histogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: mInfo.Help, TTL: e.metricCommon.expiryTime}, mInfo.TargetLabels())
e.metricCommon.AddHist(fullMetricName, histogram, mInfo)
return histogram
}

func (e *Prometheus) addAgghistogram(fullMetricName string, mInfo *metrics.Preprocessed) prometheus.Collector {
agghistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: mInfo.Help}, mInfo.TargetLabels())
agghistogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{Name: fullMetricName, Help: mInfo.Help, TTL: e.metricCommon.expiryTime}, mInfo.TargetLabels())
e.metricCommon.AddAggHist(fullMetricName, agghistogram, mInfo)
return agghistogram
}
Expand Down Expand Up @@ -311,12 +298,14 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
w.server = promserver.StartServerAsync(cfg.PromConnectionInfo, params.Name, registry)
}

metricCommon := NewMetricsCommonStruct(opMetrics, cfg.MaxMetrics, params.Name, expiryTime, w.Cleanup)
metricCommon := NewMetricsCommonStructWithVecTTL(opMetrics, cfg.MaxMetrics, params.Name, expiryTime)
w.metricCommon = metricCommon

// Init metrics
w.resetRegistry()

w.metricCommon.StartCleanupLoop()

return w, nil
}

Expand Down
Loading
Loading