Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,15 @@

LoongCollector 提供了指标接口,可以方便地为插件增加一些自监控指标,目前支持Counter,Gauge,String,Latency等类型。

接口
接口及实现

<https://github.com/alibaba/loongcollector/blob/main/pkg/pipeline/self_metric.go>

实现:

<https://github.com/alibaba/loongcollector/blob/main/pkg/helper/self_metrics_vector_imp.go>
<https://github.com/alibaba/loongcollector/blob/main/pkg/selfmonitor/metrics_vector_imp.go>

用户使用时需要引入pkg/helper包:

```go
import (
"github.com/alibaba/ilogtail/pkg/helper"
"github.com/alibaba/ilogtail/pkg/selfmonitor"
)
```

Expand All @@ -31,20 +27,20 @@ type ProcessorRateLimit struct {
}
```

创建指标时,需要将其注册到 LoongCollector Context 的 MetricRecord 中,以便 LoongCollector 能够采集上报数据,在插件的Init方法中,调用context 的 GetMetricRecord()方法来获取MetricRecord,然后调用helper.New**XXX**MetricAndRegister函数去注册一个指标,例如:
创建指标时,需要将其注册到 LoongCollector Context 的 MetricRecord 中,以便 LoongCollector 能够采集上报数据,在插件的Init方法中,调用context 的 GetMetricRecord()方法来获取MetricRecord,然后调用selfmonitor.New**XXX**MetricAndRegister函数去注册一个指标,例如:

```go
metricsRecord := p.context.GetMetricRecord()
p.limitMetric = helper.NewCounterMetricAndRegister(metricsRecord, fmt.Sprintf("%v_limited", pluginType))
p.processedMetric = helper.NewCounterMetricAndRegister(metricsRecord, fmt.Sprintf("%v_processed", pluginType))
p.limitMetric = selfmonitor.NewCounterMetricAndRegister(metricsRecord, fmt.Sprintf("%v_limited", pluginType))
p.processedMetric = selfmonitor.NewCounterMetricAndRegister(metricsRecord, fmt.Sprintf("%v_processed", pluginType))
```

用户在声明一个Metric时可以还额外注入一些插件级别的静态Label,这是一个可选参数,例如flusher_http就把RemoteURL等配置进行上报:

```go
metricsRecord := f.context.GetMetricRecord()
metricLabels := f.buildLabels()
f.matchedEvents = helper.NewCounterMetricAndRegister(metricsRecord, "http_flusher_matched_events", metricLabels...)
f.matchedEvents = selfmonitor.NewCounterMetricAndRegister(metricsRecord, "http_flusher_matched_events", metricLabels...)
```

## 指标打点
Expand All @@ -70,15 +66,20 @@ sc.lastBinLogMetric.Set(string(r.NextLogName))

## 指标上报

LoongCollector 会自动采集所有注册的指标,默认采集间隔为60s,然后通过default_flusher上报,数据格式为LogGroup,格式如下
LoongCollector 会自动采集所有注册的指标,默认采集间隔为60s,然后通过c++流水线上报,大致格式如下

```json
{"Logs":[{"Time":0,"Contents":[{"Key":"http_flusher_matched_events","Value":"2.0000"},{"Key":"__name__","Value":"http_flusher_matched_events"},{"Key":"RemoteURL","Value":"http://testeof.com/write"},{"Key":"db","Value":"%{metadata.db}"},{"Key":"flusher_http_id","Value":"0"},{"Key":"project","Value":"p"},{"Key":"config_name","Value":"c"},{"Key":"plugins","Value":""},{"Key":"category","Value":"p"},{"Key":"source_ip","Value":"100.80.230.110"}]},{"Time":0,"Contents":[{"Key":"http_flusher_unmatched_events","Value":"0.0000"},{"Key":"__name__","Value":"http_flusher_unmatched_events"},{"Key":"db","Value":"%{metadata.db}"},{"Key":"flusher_http_id","Value":"0"},{"Key":"RemoteURL","Value":"http://testeof.com/write"},{"Key":"project","Value":"p"},{"Key":"config_name","Value":"c"},{"Key":"plugins","Value":""},{"Key":"category","Value":"p"},{"Key":"source_ip","Value":"100.80.230.110"}]},{"Time":0,"Contents":[{"Key":"http_flusher_dropped_events","Value":"0.0000"},{"Key":"__name__","Value":"http_flusher_dropped_events"},{"Key":"RemoteURL","Value":"http://testeof.com/write"},{"Key":"db","Value":"%{metadata.db}"},{"Key":"flusher_http_id","Value":"0"},{"Key":"project","Value":"p"},{"Key":"config_name","Value":"c"},{"Key":"plugins","Value":""},{"Key":"category","Value":"p"},{"Key":"source_ip","Value":"100.80.230.110"}]},{"Time":0,"Contents":[{"Key":"http_flusher_retry_count","Value":"2.0000"},{"Key":"__name__","Value":"http_flusher_retry_count"},{"Key":"RemoteURL","Value":"http://testeof.com/write"},{"Key":"db","Value":"%{metadata.db}"},{"Key":"flusher_http_id","Value":"0"},{"Key":"project","Value":"p"},{"Key":"config_name","Value":"c"},{"Key":"plugins","Value":""},{"Key":"category","Value":"p"},{"Key":"source_ip","Value":"100.80.230.110"}]},{"Time":0,"Contents":[{"Key":"http_flusher_flush_failure_count","Value":"2.0000"},{"Key":"__name__","Value":"http_flusher_flush_failure_count"},{"Key":"db","Value":"%{metadata.db}"},{"Key":"flusher_http_id","Value":"0"},{"Key":"RemoteURL","Value":"http://testeof.com/write"},{"Key":"project","Value":"p"},{"Key":"config_name","Value":"c"},{"Key":"plugins","Value":""},{"Key":"category","Value":"p"},{"Key":"source_ip","Value":"100.80.230.110"}]},{"Time":0,"Contents":[{"Key":"http_flusher_flush_latency_ns","Value":"2504448312.5000"},{"Key":"__name__","Value":"http_flusher_flush_latency_ns"},{"Key":"db","Value":"%{metadata.db}"},{"Key":"flusher_http_id","Value":"0"},{"Key":"RemoteURL","Value":"http://testeof.com/write"},{"Key":"project","Value":"p"},{"Key":"config_name","Value":"c"},{"Key":"plugins","Value":""},{"Key":"category","Value":"p"},{"Key":"source_ip","Value":"100.80.230.110"}]}],"Category":"","Topic":"","Source":"","MachineUUID":""}
[]{
{"counters":"{\"http_flusher_dropped_events\":\"3.0000\",\"http_flusher_flush_failure_count\":\"6.0000\",\"http_flusher_matched_events\":\"1.0000\",\"http_flusher_retry_count\":\"5.0000\",\"http_flusher_unmatched_events\":\"2.0000\"}","gauges":"{\"http_flusher_flush_latency_ns\":\"7.0000\"}","labels":"{\"PluginId\":\"13\",\"PluginType\":\"flusher_http\",\"RemoteURL\":\"http://localhost:8081\"}"}
{"counters":"{\"http_flusher_status_code_count\":\"8.0000\"}","gauges":"{}","labels":"{\"PluginId\":\"13\",\"PluginType\":\"flusher_http\",\"RemoteURL\":\"http://localhost:8081\",\"status_code\":\"200\"}"}
{"counters":"{\"http_flusher_status_code_count\":\"9.0000\"}","gauges":"{}","labels":"{\"PluginId\":\"13\",\"PluginType\":\"flusher_http\",\"RemoteURL\":\"http://localhost:8081\",\"status_code\":\"400\"}"}
{"counters":"{\"http_flusher_error_count\":\"10.0000\"}","gauges":"{}","labels":"{\"PluginId\":\"13\",\"PluginType\":\"flusher_http\",\"RemoteURL\":\"http://localhost:8081\",\"level\":\"error\",\"reason\":\"timeout\"}"}
{"counters":"{\"http_flusher_error_count\":\"11.0000\"}","gauges":"{}","labels":"{\"PluginId\":\"13\",\"PluginType\":\"flusher_http\",\"RemoteURL\":\"http://localhost:8081\",\"level\":\"warn\",\"reason\":\"retry\"}"}
{"counters":"{\"http_flusher_error_count\":\"12.0000\"}","gauges":"{}","labels":"{\"PluginId\":\"13\",\"PluginType\":\"flusher_http\",\"RemoteURL\":\"http://localhost:8081\",\"level\":\"error\",\"reason\":\"dropped\"}"}
}
```

一组LogGroup中会有多条Log,每一条Log都对应一条指标,其中`
{"Key":"__name__","Value":"http_flusher_matched_events"}
`是一个特殊的Label,代表指标的名字。
每个map[string]string都会包含同label的多条指标序列。

## 高级功能

Expand All @@ -97,11 +98,11 @@ type FlusherHTTP struct {
}
```

声明并注册MetricVector时,可以使用helper.New**XXX**MetricVectorAndRegister方法,
声明并注册MetricVector时,可以使用selfmonitor.New**XXX**MetricVectorAndRegister方法,
需要将其带有哪些动态Label的Name也进行声明:

```go
f.statusCodeStatistics = helper.NewCounterMetricVectorAndRegister(metricsRecord,
f.statusCodeStatistics = selfmonitor.NewCounterMetricVectorAndRegister(metricsRecord,
"http_flusher_status_code_count",
map[string]string{"RemoteURL": f.RemoteURL},
[]string{"status_code"},
Expand Down
5 changes: 2 additions & 3 deletions pkg/helper/k8smeta/k8s_meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,8 @@ func GetMetaManagerMetrics() []map[string]string {
},
}

return []map[string]string{
manager.metricRecord.ExportMetricRecords(),
}
return manager.metricRecord.ExportMetricRecords()

}

func (m *MetaManager) runServer() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/helper/local_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (p *LocalContext) ExportMetricRecords() []map[string]string {

records := make([]map[string]string, 0)
for _, metricsRecord := range p.MetricsRecords {
records = append(records, metricsRecord.ExportMetricRecords())
records = append(records, metricsRecord.ExportMetricRecords()...)
}
return records
}
Expand Down
184 changes: 156 additions & 28 deletions pkg/selfmonitor/metrics_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,51 +31,179 @@ type MetricsRecord struct {
MetricCollectors []MetricCollector
}

func (m *MetricsRecord) insertLabels(record map[string]string) {
labels := map[string]string{}
for _, label := range m.Labels {
labels[label.Key] = label.Value
}
labelsStr, _ := json.Marshal(labels)
record[MetricLabelPrefix] = string(labelsStr)
}

// RegisterMetricCollector is used for registering metric collector (vector).
func (m *MetricsRecord) RegisterMetricCollector(collector MetricCollector) {
m.Lock()
defer m.Unlock()
m.MetricCollectors = append(m.MetricCollectors, collector)
}

// ExportMetricRecords is used for exporting metrics records.
// It will replace Serialize in the future.
func (m *MetricsRecord) ExportMetricRecords() map[string]string {
// ExportMetricRecords exports all metrics bound to this metric record.
// The results may be a list of map[string]string, each map[string]string is a set of measurements has the same labels.
// for example:
// []{
// {"counters":"{\"http_flusher_dropped_events\":\"3.0000\",\"http_flusher_flush_failure_count\":\"6.0000\",\"http_flusher_matched_events\":\"1.0000\",\"http_flusher_retry_count\":\"5.0000\",\"http_flusher_unmatched_events\":\"2.0000\"}","gauges":"{\"http_flusher_flush_latency_ns\":\"7.0000\"}","labels":"{\"PluginId\":\"13\",\"PluginType\":\"flusher_http\",\"RemoteURL\":\"http://localhost:8081\"}"}
// {"counters":"{\"http_flusher_status_code_count\":\"8.0000\"}","gauges":"{}","labels":"{\"PluginId\":\"13\",\"PluginType\":\"flusher_http\",\"RemoteURL\":\"http://localhost:8081\",\"status_code\":\"200\"}"}
// {"counters":"{\"http_flusher_status_code_count\":\"9.0000\"}","gauges":"{}","labels":"{\"PluginId\":\"13\",\"PluginType\":\"flusher_http\",\"RemoteURL\":\"http://localhost:8081\",\"status_code\":\"400\"}"}
// {"counters":"{\"http_flusher_error_count\":\"10.0000\"}","gauges":"{}","labels":"{\"PluginId\":\"13\",\"PluginType\":\"flusher_http\",\"RemoteURL\":\"http://localhost:8081\",\"level\":\"error\",\"reason\":\"timeout\"}"}
// {"counters":"{\"http_flusher_error_count\":\"11.0000\"}","gauges":"{}","labels":"{\"PluginId\":\"13\",\"PluginType\":\"flusher_http\",\"RemoteURL\":\"http://localhost:8081\",\"level\":\"warn\",\"reason\":\"retry\"}"}
// {"counters":"{\"http_flusher_error_count\":\"12.0000\"}","gauges":"{}","labels":"{\"PluginId\":\"13\",\"PluginType\":\"flusher_http\",\"RemoteURL\":\"http://localhost:8081\",\"level\":\"error\",\"reason\":\"dropped\"}"}
// }
// Note:
// A metric may have three levels of labels
// 1. MetricsRecord Level Const Labels, like PluginType=flusher_http, PluginId=1
// 2. Metric Level Const Labels, for example, flusher_http may have a const label: RemoteURL=http://aliyun.com/write
// 3. Metric Level Dynamic Labels, like status_code=200, status_code=204
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是不是可以这样理解:

  1. MetricsRecord级别有一些label,也有一些没有自己label的metrics,他们会输出一个metric记录,也就是Line45这一条
  2. 每个Metric会有自己的一些const和Dynamic label,当有这些label时,一个metric会变成一条单独的metric记录被暴露出来,例如上面的http_flusher_status_code_count * 2,http_flusher_error_count * 3?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

指标是有类型的,go这边key是metric_category,value是agent、runner、这些。https://observability.cn/project/loongcollector/internal-metrics-description/#_top

具有单独label的Metric,是不是需要定义为plugin_source级?这块有考虑过吗

func (m *MetricsRecord) ExportMetricRecords() []map[string]string {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

方便给一个前后对比吗?先前有问题的指标和修改之后的样子

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我晚点补一下

m.RLock()
defer m.RUnlock()

record := map[string]string{}
counters := map[string]string{}
gauges := map[string]string{}
m.insertLabels(record)
res := []map[string]string{}

curCounters := map[string]string{}
currGauges := map[string]string{}
commonLabels := m.getConstLabels()
currLabels := m.getConstLabels()
currSeriesCount := 0

for _, metricCollector := range m.MetricCollectors {
metrics := metricCollector.Collect()
for _, metric := range metrics {
singleMetric := metric.Export()
if len(singleMetric) == 0 {
metricVectors := metricCollector.Collect()

for _, metric := range metricVectors {
measurement := metric.Export()
if len(measurement) == 0 {
continue
}

if currSeriesCount > 0 && hasDifferentLabels(currLabels, commonLabels, measurement) {
res = append(res, buildRecord(currLabels, currGauges, curCounters))
currLabels = m.getConstLabels()
curCounters = map[string]string{}
currGauges = map[string]string{}
currSeriesCount = 0
}

currName := getMetriName(measurement)
if currName == "" {
continue
}
valueName := singleMetric[SelfMetricNameKey]
valueValue := singleMetric[valueName]

currValue := getMetricValue(measurement)
if currValue == "" {
continue
}

currMeasurementLabels := getLabels(measurement)
for key, value := range currMeasurementLabels {
currLabels[key] = value
}

if metric.Type() == CounterType {
counters[valueName] = valueValue
curCounters[currName] = currValue
currSeriesCount++
}
if metric.Type() == GaugeType {
gauges[valueName] = valueValue
currGauges[currName] = currValue
currSeriesCount++
}

}
}
countersStr, _ := json.Marshal(counters)
record[MetricCounterPrefix] = string(countersStr)

if currSeriesCount > 0 {
res = append(res, buildRecord(currLabels, currGauges, curCounters))
}
return res
}

func (m *MetricsRecord) getConstLabels() map[string]string {
labels := map[string]string{}
for _, label := range m.Labels {
labels[label.Key] = label.Value
}
return labels
}

// hasDifferentLabels checks if the next measurement has different labels compared to current labels.
// 1. currLabels: Labels from the current metric instance
// 2. commonLabels: Shared labels that should be present in all metrics
// 3. nextMeasurement: The next measurement to check
func hasDifferentLabels(currLabels map[string]string, commonLabels map[string]string, nextMeasurement map[string]string) bool {
// Check if next measurement's dynamic labels differ from current labels

newLabelCount := 0
for key, value := range nextMeasurement {
if !isLabel(key, nextMeasurement) {
continue
}

// If label exists but value differs, labels are different
currValue, exists := currLabels[key]
if !exists || currValue != value {
return true
}
newLabelCount++
}

// Check if common labels differ from current labels.
// This only happens if the current metrics override the common labels.
for key, value := range commonLabels {
if value != currLabels[key] {
return true
}
if _, ok := nextMeasurement[key]; !ok {
newLabelCount++
}
}

return newLabelCount != len(currLabels)
}

// getMetriName returns the name of the metric.
// singleMeasurement is a single measurement of a metric.
// for example, a measurement of "flusher_http_flush_count" may looks like:
// {__name__="flusher_http_flush_count", flusher_http_flush_count=1024, remote_url=http://localhost:8080/write, status_code=200}
// and its name is "flusher_http_flush_count".
func getMetriName(singleMeasurement map[string]string) string {
return singleMeasurement[SelfMetricNameKey]
}

// getMetricValue returns the value of the metric.
// for example, a measurement of "flusher_http_flush_count" may looks like:
// {__name__="flusher_http_flush_count", flusher_http_flush_count=1024, remote_url=http://localhost:8080/write, status_code=200}
// and its value is "1024".
func getMetricValue(singleMeasurement map[string]string) string {
return singleMeasurement[getMetriName(singleMeasurement)]
}

// getLabels returns the labels of the metric.
// for example, a measurement of "flusher_http_flush_count" may looks like:
// {__name__="flusher_http_flush_count", flusher_http_flush_count=1024, remote_url=http://localhost:8080/write, status_code=200}
// and its labels is {"remote_url": "http://localhost:8080/write", "status_code": "200"}
func getLabels(singleMeasurement map[string]string) map[string]string {
labels := map[string]string{}
for key, value := range singleMeasurement {
if isLabel(key, singleMeasurement) {
labels[key] = value
}
}
return labels
}

// isLabel returns true if the key is a label.
func isLabel(key string, singleMetric map[string]string) bool {
return key != SelfMetricNameKey && key != getMetriName(singleMetric)
}

func buildRecord(labels, gauges, counters map[string]string) map[string]string {
records := make(map[string]string, 3)
labelsStr, _ := json.Marshal(labels)
records[MetricLabelPrefix] = string(labelsStr)

gaugesStr, _ := json.Marshal(gauges)
record[MetricGaugePrefix] = string(gaugesStr)
return record
records[MetricGaugePrefix] = string(gaugesStr)

countersStr, _ := json.Marshal(counters)
records[MetricCounterPrefix] = string(countersStr)
return records
}
Loading
Loading