Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 172 additions & 4 deletions docs/cn/plugins/input/extended/service-kubernetesmeta-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## 简介

`service_kubernetes_meta` 定时采集Kubernetes元数据,包括Pod、Deployment等资源及其之间的关系。并提供HTTP查询接口,支持通过一些字段索引,如Pod IP、Host IP等信息快速查询元数据
`service_kubernetes_meta` 定时采集 Kubernetes 元数据,包括 Pod、Deployment 等内置资源及其关系;可通过 **`CustomResources`** 扩展采集第三方 CR(如 Argo Workflow),并生成对应实体与链路日志。提供 HTTP 查询接口,支持通过 Pod IP、Host IP 等索引快速查询元数据

## 版本

Expand All @@ -14,7 +14,7 @@

## 配置参数

**注意:** 本插件需要在Kubernetes集群中运行,且需要有访问Kubernetes API的权限。并且部署模式为单例模式,且配置环境变量`DEPLOY_MODE`为`singleton``ENABLE_KUBERNETES_META`为`true`。
**注意:** 本插件需要在 Kubernetes 集群中(或具备访问 apiserver 的配置)运行,且需要有访问Kubernetes API的权限。并且部署模式为单例模式,且配置环境变量 `DEPLOY_MODE=singleton``ENABLE_KUBERNETES_META=true`。

| 参数 | 类型,默认值 | 说明 |
| - | - | - |
Expand All @@ -34,8 +34,9 @@
| PersistentVolumeClaim | bool, false | 是否采集PersistentVolumeClaim元数据。 |
| StorageClass | bool, false | 是否采集StorageClass元数据。 |
| Ingress | bool, false | 是否采集Ingress元数据。 |
| EnableLabels | bool, false | 是否采集Kubernetes资源的标签(Labels)信息。启用后会在元数据中包含资源的标签字段。 |
| EnableAnnotations | bool, false | 是否采集Kubernetes资源的注解(Annotations)信息。启用后会在元数据中包含资源的注解字段。 |
| EnableLabels | bool, false | 是否采集**内置**Kubernetes 资源的标签(Labels)。 |
| EnableAnnotations | bool, false | 是否采集**内置**资源的注解(Annotations)。 |
| CustomResources | []object,可选 | 第三方 CR(动态 Informer)采集与链路,见下文「**第三方自定义资源(CustomResources)**」与「**Kubernetes RBAC 权限**」。 |
| Node2Pod | string, 无默认值(可选) | Node到Pod的关系名,不填则不生成关系。 |
| Deployment2Pod | string, 无默认值(可选) | Deployment到Pod的关系名,不填则不生成关系。 |
| ReplicaSet2Pod | string, 无默认值(可选) | ReplicaSet到Pod的关系名,不填则不生成关系。 |
Expand Down Expand Up @@ -64,6 +65,173 @@
| Cluster2PersistentVolume | string, 无默认值(可选) | Cluster到PersistentVolume的关系名,不填则不生成关系。 |
| Cluster2StorageClass | string, 无默认值(可选) | Cluster到StorageClass的关系名,不填则不生成关系。 |

## Kubernetes RBAC 权限

本插件通过 **ServiceAccount**(或 kubeconfig 身份)访问 kube-apiserver。除内置资源外,凡在配置中开启的采集与 **CustomResources** 中声明的 GVR,都需要在 **`ClusterRole`(推荐集群级采集)+ `ClusterRoleBinding`** 或对应的 **`Role` + `RoleBinding`** 中授予至少 **`get`、`list`、`watch`**。

### 常见错误

动态 Informer 对 CR `list/watch` 失败时,日志中可能出现类似:

`cannot list resource "workflows" in API group "argoproj.io" ... User "system:serviceaccount:..." cannot list ...`

说明当前身份**缺少该 API 组下对应复数资源**的权限,与 LoongCollector 配置无关,需在 RBAC 中补齐。

若 **CustomResources** 中某一 CR 的 watch **连续出现 RBAC/鉴权类错误**并达到停止阈值,该 CR 的动态 Informer 会在当前进程内停止且不会自动恢复,修正权限后须**手动重启 LoongCollector 进程**方可继续采集该类型。

同样地,若启动时 discovery 检测到该 CR 的 **APIGroup/APIVersion/Resource 不可用**(例如 CRD 尚未安装),该 CR 的动态 Informer 在当前进程内也不会自动恢复。这是 **by-design** 行为(用于避免重复启动与日志风暴),修正后需**手动重启 LoongCollector**。

### 内置资源

与配置里打开的内置开关对应即可(如 `pods`、`namespaces`、`deployments` 等)。权限请保持最小可用**`get`、`list`、`watch`** 权限,下面是示例。
```yaml
- apiGroups: [""]
resources:
- configmaps
- nodes
- pods
- services
- persistentvolumeclaims
- persistentvolumes
- namespaces
- endpoints
verbs: ["get", "list", "watch"]
- apiGroups: ["extensions"]
resources:
- daemonsets
- deployments
- replicasets
- ingresses
verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
resources:
- daemonsets
- deployments
- replicasets
- statefulsets
verbs: ["get", "list", "watch"]
- apiGroups: ["batch"]
resources:
- cronjobs
- jobs
verbs: ["get", "list", "watch"]
- apiGroups:
- networking.k8s.io
resources:
- ingresses
- networkpolicies
verbs:
- get
- list
- watch
- apiGroups:
- storage.k8s.io
resources:
- storageclasses
- volumeattachments
verbs:
- get
- list
- watch
```

### 自定义资源示例:Argo Workflow

采集 `argoproj.io/v1alpha1` 的 `Workflow`(复数资源名 **`workflows`**)时,在 **ClusterRole** 的 `rules` 中增加(按需合并到现有角色):

```yaml
- apiGroups:
- argoproj.io
resources:
- workflows
verbs:
- get
- list
- watch
```

* **`resources`** 须为 CRD 中 **`spec.names.plural`**,可用 `kubectl api-resources --api-group=argoproj.io` 核对。
* 使用 **ClusterRoleBinding** 绑定到运行采集的 ServiceAccount 时,可在全命名空间内 `list/watch` 该资源(Workflow 一般为命名空间作用域资源)。
* 若还需采集同组其它 CR(如 `workflowtemplates`),在 `resources` 列表中继续追加即可。
* 不推荐在生产中用 `resources: ["*"]` 放宽整组权限,除非有明确运维规范。

应用后等待 Informer 重试或重启相关 Pod 使权限生效。


## 第三方自定义资源(CustomResources)

`CustomResources` 为数组,每一项对应一种 CR(**GVR + Kind**),由动态客户端监听;可选生成实体日志及 **Pod→CR**、**Namespace→CR** 链路。

与内置资源相比,CR 在代码里走 **`crUnifiedCache`(dynamic informer + `unstructured`)**,内置走 **`k8sMetaCache`(typed informer)**;二者均实现 **`MetaCache`** 并共用 **`DeferredDeletionMetaStore`**,差异主要在客户端与启动时机。对比如下(**供开发与排障参考**)。

### 内置缓存与 CR 缓存对比

| 维度 | `k8sMetaCache`(内置资源) | `crUnifiedCache`(CustomResources) |
|------|---------------------------|-------------------------------------|
| **K8s 客户端** | `kubernetes.Clientset` | `dynamic.Interface`(`dynamic.NewForConfig`) |
| **Informer** | `informers.SharedInformerFactory` + 各资源 Typed Informer | `dynamicinformer.DynamicSharedInformerFactory` + `ForResource(GVR).Informer()` |
| **资源标识** | 内部 `resourceType` 常量 + `getFactoryInformer` 分支 | `schema.GroupVersionResource`(GVR),`resourceType` 多为配置中的 `EntityType` |
| **对象形态** | 具体 API 类型(如 `*v1.Pod`),经 `preProcess` 等处理 | 统一 `*unstructured.Unstructured`,`objectToUnstructured` |
| **REST / 内容协商** | 跟随 `MetaManager` 为 clientset 设置的配置(含 protobuf 等) | `restConfigForDynamicClient`:**Dynamic 使用 JSON**,与 clientset 的 protobuf 区分 |
| **何时起 watch** | `init(clientset)` 内:`metaStore.Start()` + `watch()` | `init` 不占 clientset;`setRESTConfig` 后由 `EnsureWatchStarted()`(`sync.Once`)**延迟启动** |
| **`watch` 方法** | 完整实现(factory、事件、`WaitForCacheSync` 等) | 空实现;逻辑在 `EnsureWatchStarted` 内 |
| **索引** | `getIdxRules(resourceType)`(如 Host IP 等) | `generateCommonKey` 等 CR 侧规则 |
| **体积优化** | 按资源类型的 `preProcess` | 如 `trimCRObjectForCache`(裁剪 `spec`、`managedFields` 等) |
| **与 `MetaManager.Init` 顺序 init** | 各内置 cache 的 `watch` 参与**顺序**初始化链 | `init` 轻量;避免 GVR / REST 未就绪即起 Informer |

### 子项字段

| 字段 | 类型 | 说明 |
| - | - | - |
| EntityType | string,**必填** | 内部缓存与事件类型键,用于 `__entity_type__`、实体 ID 及链路类型(如 `pod-><EntityType>`)。须在多条 CR 配置间唯一。 |
| APIGroup | string,必填 | CRD 的 API 组,如 `argoproj.io`。 |
| APIVersion | string,必填 | 版本,如 `v1alpha1`。 |
| Resource | string,必填 | **复数**资源名,如 `workflows`(与 `kubectl api-resources` / CRD `spec.names.plural` 一致)。 |
| Kind | string,必填 | Kubernetes **Kind**,如 `Workflow`;用于 `ownerReferences` 匹配及实体日志中的 `kind` 等。 |
| CollectEntity | bool | 为 true 时投递该 CR 的实体(entity)日志。 |
| PodLink | object,可选 | 配置后,在开启 **Pod** 的前提下可生成 **Pod→CR** 链路;需同时配置 **`Entity2PodRelation`**。 |
| Entity2PodRelation | string,可选 | entity_link 中 **`__relation_type__`**(CR 与 Pod 之间的业务关系名);与 `PodLink` 同时非空时生效。 |
| Namespace2EntityRelation | string,可选 | entity_link 中 **`__relation_type__`**(Namespace 与该 CR);需 **`CollectEntity: true`**、顶层 **`Namespace: true`** 且本字段非空;仅**有命名空间**的 CR 会生成(集群级 CR 跳过)。 |
| EnableLabels | bool | 为 true 时导出**全部** labels;不受顶层 `EnableLabels` 影响。默认不导出。 |
| EnableAnnotations | bool | 为 true 时导出**全部** annotations;不受顶层 `EnableAnnotations` 影响。默认不导出。 |

**PodLink** 子字段:

| 字段 | 说明 |
| - | - |
| OwnerKind | 匹配 Pod `ownerReferences[].Kind`,默认可与条目的 `Kind` 一致。 |
| OwnerAPIGroupContains | 与 `ownerReferences[].apiVersion` 做子串匹配;空则默认使用条目的 `APIGroup`。 |
| PodLabelKey | 无匹配 owner 时的回退标签,如 Argo 的 `workflows.argoproj.io/workflow`。 |

### 配置示例:Argo Workflow

```yaml
enable: true
inputs:
- Type: service_kubernetes_meta
Interval: 600
Node: true
Pod: true
# Third-party CRs: configure each GVR (and optional PodLink / Entity2PodRelation) under CustomResources.
CustomResources:
- APIGroup: argoproj.io # API-Group
APIVersion: v1alpha1 # API version
Resource: workflows # 资源类型
Kind: Workflow # kind信息
EntityType: argo.workflow #实体类型
CollectEntity: true
Entity2PodRelation: "contains"
Namespace2EntityRelation: "contains"
PodLink: # 和Pod的关联关系提取
OwnerKind: Workflow # Pod OwnerReference 对应的资源Kind
OwnerAPIGroupContains: argoproj.io # Pod OwnerReference 对应的 API-Group
PodLabelKey: workflows.argoproj.io/workflow # 兜底逻辑,从label中提取关联关系
EnableLabels: true # CR粒度配置是否上报标签
EnableAnnotations: true # CR粒度配置是否上报注释
```

同时请确保 **RBAC** 已授予对 `argoproj.io` / `workflows` 的 `get、list、watch`(见上文「Kubernetes RBAC 权限」)。


## 环境变量

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ require (
k8s.io/api v0.32.1
k8s.io/apimachinery v0.32.1
k8s.io/client-go v0.32.1
sigs.k8s.io/controller-runtime v0.12.1
)

require (
Expand Down Expand Up @@ -281,7 +282,6 @@ require (
k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect
launchpad.net/gocheck v0.0.0-20140225173054-000000000087 // indirect
sigs.k8s.io/controller-runtime v0.12.1 // indirect
sigs.k8s.io/gateway-api v0.6.2 // indirect
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect
Expand Down
33 changes: 14 additions & 19 deletions pkg/helper/k8smeta/k8s_meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type k8sMetaCache struct {

resourceType string
schema *runtime.Scheme

giveUp *informerGiveUp
}

func newK8sMetaCache(stopCh chan struct{}, resourceType string) *k8sMetaCache {
Expand All @@ -44,6 +46,7 @@ func newK8sMetaCache(stopCh chan struct{}, resourceType string) *k8sMetaCache {
m.metaStore = NewDeferredDeletionMetaStore(m.eventCh, m.stopCh, 120, cache.MetaNamespaceKeyFunc, idxRules...)
m.resourceType = resourceType
m.schema = runtime.NewScheme()
m.giveUp = newInformerGiveUp()
_ = v1.AddToScheme(m.schema)
_ = batch.AddToScheme(m.schema)
_ = batchv1beta1.AddToScheme(m.schema)
Expand Down Expand Up @@ -90,11 +93,19 @@ func (m *k8sMetaCache) UnRegisterSendFunc(key string) {
}

func (m *k8sMetaCache) watch(stopCh <-chan struct{}) {
_ = stopCh // MetaCache uses m.stopCh for global shutdown; parameter kept for interface compatibility.
defer panicRecover()
factory, informer := m.getFactoryInformer()
if informer == nil {
return
}
mergedStop := m.giveUp.mergedStop(m.stopCh)
if err := attachWatchErrorHandler(informer, m.giveUp, watchErrorHandlerOpts{
ResourceType: m.resourceType,
GiveUpStopMsg: "stopping informer after repeated errors (RBAC/auth or missing API resource; no further retries)",
}); err != nil {
logger.Error(context.Background(), K8sMetaUnifyErrorCode, "fail to set watch error handler", err)
}
_, _ = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
defer panicRecover()
Expand Down Expand Up @@ -137,16 +148,8 @@ func (m *k8sMetaCache) watch(stopCh <-chan struct{}) {
metaManager.deleteEventCount.Add(1)
},
})
go factory.Start(stopCh)
// wait infinite for first cache sync success
for {
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
logger.Error(context.Background(), K8sMetaUnifyErrorCode, "service cache sync timeout")
time.Sleep(1 * time.Second)
} else {
break
}
}
go factory.Start(mergedStop)
waitInformerCacheSync(mergedStop, informer.HasSynced, informerCacheSyncOpts{ResourceType: m.resourceType})
}

func (m *k8sMetaCache) getFactoryInformer() (informers.SharedInformerFactory, cache.SharedIndexInformer) {
Expand Down Expand Up @@ -198,15 +201,6 @@ func (m *k8sMetaCache) getFactoryInformer() (informers.SharedInformerFactory, ca
if informer == nil {
return factory, nil
}
// add watch error handler
err := informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
if err != nil {
logger.Error(context.Background(), K8sMetaUnifyErrorCode, "resourceType", m.resourceType, "watchError", err)
}
})
if err != nil {
logger.Error(context.Background(), K8sMetaUnifyErrorCode, "fail to handle watch error handler", err)
}
return factory, informer
}

Expand Down Expand Up @@ -449,6 +443,7 @@ func containsResource(resources []metav1.APIResource, name string) bool {
}
return false
}

func generateNodeKey(obj interface{}) ([]string, error) {
node, err := meta.Accessor(obj)
if err != nil {
Expand Down
14 changes: 13 additions & 1 deletion pkg/helper/k8smeta/k8s_meta_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

const (
Expand All @@ -27,7 +28,6 @@ const (
STORAGECLASS = "storageclass"
INGRESS = "ingress"
CONTAINER = "container"
// entity link type, the direction is from resource which will be trigger to linked resource
//revive:disable:var-naming
LINK_SPLIT_CHARACTER = "->"
POD_NODE = "pod->node"
Expand Down Expand Up @@ -202,6 +202,18 @@ type IngressNamespace struct {
Namespace *v1.Namespace
}

// PodCustomResource links a Pod to an arbitrary CR stored as unstructured.
type PodCustomResource struct {
Pod *v1.Pod
CR *unstructured.Unstructured
}

// NamespaceCustomResource links a Namespace to a namespaced CR (unstructured).
type NamespaceCustomResource struct {
Namespace *v1.Namespace
CR *unstructured.Unstructured
}

const (
EventTypeAdd = "add"
EventTypeUpdate = "update"
Expand Down
Loading
Loading