diff --git a/docs/cn/plugins/input/extended/service-kubernetesmeta-v2.md b/docs/cn/plugins/input/extended/service-kubernetesmeta-v2.md index 0e88dcce34..91085a510d 100644 --- a/docs/cn/plugins/input/extended/service-kubernetesmeta-v2.md +++ b/docs/cn/plugins/input/extended/service-kubernetesmeta-v2.md @@ -2,7 +2,7 @@ ## 简介 -`service_kubernetes_meta` 定时采集Kubernetes元数据,包括Pod、Deployment等资源及其之间的关系。并提供HTTP查询接口,支持通过一些字段索引,如Pod IP、Host IP等信息快速查询元数据。 +`service_kubernetes_meta` 定时采集 Kubernetes 元数据,包括 Pod、Deployment 等内置资源及其关系;可通过 **`CustomResources`** 扩展采集第三方 CR(如 Argo Workflow),并生成对应实体与链路日志。提供 HTTP 查询接口,支持通过 Pod IP、Host IP 等索引快速查询元数据。 ## 版本 @@ -14,7 +14,7 @@ ## 配置参数 -**注意:** 本插件需要在Kubernetes集群中运行,且需要有访问Kubernetes API的权限。并且部署模式为单例模式,且配置环境变量`DEPLOY_MODE`为`singleton`,`ENABLE_KUBERNETES_META`为`true`。 +**注意:** 本插件需要在 Kubernetes 集群中(或具备访问 apiserver 的配置)运行,且需要有访问Kubernetes API的权限。并且部署模式为单例模式,且配置环境变量 `DEPLOY_MODE=singleton`、`ENABLE_KUBERNETES_META=true`。 | 参数 | 类型,默认值 | 说明 | | - | - | - | @@ -34,8 +34,9 @@ | PersistentVolumeClaim | bool, false | 是否采集PersistentVolumeClaim元数据。 | | StorageClass | bool, false | 是否采集StorageClass元数据。 | | Ingress | bool, false | 是否采集Ingress元数据。 | -| EnableLabels | bool, false | 是否采集Kubernetes资源的标签(Labels)信息。启用后会在元数据中包含资源的标签字段。 | -| EnableAnnotations | bool, false | 是否采集Kubernetes资源的注解(Annotations)信息。启用后会在元数据中包含资源的注解字段。 | +| EnableLabels | bool, false | 是否采集**内置**Kubernetes 资源的标签(Labels)。 | +| EnableAnnotations | bool, false | 是否采集**内置**资源的注解(Annotations)。 | +| CustomResources | []object,可选 | 第三方 CR(动态 Informer)采集与链路,见下文「**第三方自定义资源(CustomResources)**」与「**Kubernetes RBAC 权限**」。 | | Node2Pod | string, 无默认值(可选) | Node到Pod的关系名,不填则不生成关系。 | | Deployment2Pod | string, 无默认值(可选) | Deployment到Pod的关系名,不填则不生成关系。 | | ReplicaSet2Pod | string, 无默认值(可选) | ReplicaSet到Pod的关系名,不填则不生成关系。 | @@ -64,6 +65,173 @@ | Cluster2PersistentVolume | string, 无默认值(可选) | Cluster到PersistentVolume的关系名,不填则不生成关系。 | | Cluster2StorageClass | string, 无默认值(可选) | Cluster到StorageClass的关系名,不填则不生成关系。 | +## Kubernetes RBAC 权限 + +本插件通过 **ServiceAccount**(或 kubeconfig 身份)访问 kube-apiserver。除内置资源外,凡在配置中开启的采集与 **CustomResources** 中声明的 GVR,都需要在 **`ClusterRole`(推荐集群级采集)+ `ClusterRoleBinding`** 或对应的 **`Role` + `RoleBinding`** 中授予至少 **`get`、`list`、`watch`**。 + +### 常见错误 + +动态 Informer 对 CR `list/watch` 失败时,日志中可能出现类似: + +`cannot list resource "workflows" in API group "argoproj.io" ... User "system:serviceaccount:..." cannot list ...` + +说明当前身份**缺少该 API 组下对应复数资源**的权限,与 LoongCollector 配置无关,需在 RBAC 中补齐。 + +若 **CustomResources** 中某一 CR 的 watch **连续出现 RBAC/鉴权类错误**并达到停止阈值,该 CR 的动态 Informer 会在当前进程内停止且不会自动恢复,修正权限后须**手动重启 LoongCollector 进程**方可继续采集该类型。 + +同样地,若启动时 discovery 检测到该 CR 的 **APIGroup/APIVersion/Resource 不可用**(例如 CRD 尚未安装),该 CR 的动态 Informer 在当前进程内也不会自动恢复。这是 **by-design** 行为(用于避免重复启动与日志风暴),修正后需**手动重启 LoongCollector**。 + +### 内置资源 + +与配置里打开的内置开关对应即可(如 `pods`、`namespaces`、`deployments` 等)。权限请保持最小可用**`get`、`list`、`watch`** 权限,下面是示例。 +```yaml + - apiGroups: [""] + resources: + - configmaps + - nodes + - pods + - services + - persistentvolumeclaims + - persistentvolumes + - namespaces + - endpoints + verbs: ["get", "list", "watch"] + - apiGroups: ["extensions"] + resources: + - daemonsets + - deployments + - replicasets + - ingresses + verbs: ["get", "list", "watch"] + - apiGroups: ["apps"] + resources: + - daemonsets + - deployments + - replicasets + - statefulsets + verbs: ["get", "list", "watch"] + - apiGroups: ["batch"] + resources: + - cronjobs + - jobs + verbs: ["get", "list", "watch"] + - apiGroups: + - networking.k8s.io + resources: + - ingresses + - networkpolicies + verbs: + - get + - list + - watch + - apiGroups: + - storage.k8s.io + resources: + - storageclasses + - volumeattachments + verbs: + - get + - list + - watch +``` + +### 自定义资源示例:Argo Workflow + +采集 `argoproj.io/v1alpha1` 的 `Workflow`(复数资源名 **`workflows`**)时,在 **ClusterRole** 的 `rules` 中增加(按需合并到现有角色): + +```yaml + - apiGroups: + - argoproj.io + resources: + - workflows + verbs: + - get + - list + - watch +``` + +* **`resources`** 须为 CRD 中 **`spec.names.plural`**,可用 `kubectl api-resources --api-group=argoproj.io` 核对。 +* 使用 **ClusterRoleBinding** 绑定到运行采集的 ServiceAccount 时,可在全命名空间内 `list/watch` 该资源(Workflow 一般为命名空间作用域资源)。 +* 若还需采集同组其它 CR(如 `workflowtemplates`),在 `resources` 列表中继续追加即可。 +* 不推荐在生产中用 `resources: ["*"]` 放宽整组权限,除非有明确运维规范。 + +应用后等待 Informer 重试或重启相关 Pod 使权限生效。 + + +## 第三方自定义资源(CustomResources) + +`CustomResources` 为数组,每一项对应一种 CR(**GVR + Kind**),由动态客户端监听;可选生成实体日志及 **Pod→CR**、**Namespace→CR** 链路。 + +与内置资源相比,CR 在代码里走 **`crUnifiedCache`(dynamic informer + `unstructured`)**,内置走 **`k8sMetaCache`(typed informer)**;二者均实现 **`MetaCache`** 并共用 **`DeferredDeletionMetaStore`**,差异主要在客户端与启动时机。对比如下(**供开发与排障参考**)。 + +### 内置缓存与 CR 缓存对比 + +| 维度 | `k8sMetaCache`(内置资源) | `crUnifiedCache`(CustomResources) | +|------|---------------------------|-------------------------------------| +| **K8s 客户端** | `kubernetes.Clientset` | `dynamic.Interface`(`dynamic.NewForConfig`) | +| **Informer** | `informers.SharedInformerFactory` + 各资源 Typed Informer | `dynamicinformer.DynamicSharedInformerFactory` + `ForResource(GVR).Informer()` | +| **资源标识** | 内部 `resourceType` 常量 + `getFactoryInformer` 分支 | `schema.GroupVersionResource`(GVR),`resourceType` 多为配置中的 `EntityType` | +| **对象形态** | 具体 API 类型(如 `*v1.Pod`),经 `preProcess` 等处理 | 统一 `*unstructured.Unstructured`,`objectToUnstructured` | +| **REST / 内容协商** | 跟随 `MetaManager` 为 clientset 设置的配置(含 protobuf 等) | `restConfigForDynamicClient`:**Dynamic 使用 JSON**,与 clientset 的 protobuf 区分 | +| **何时起 watch** | `init(clientset)` 内:`metaStore.Start()` + `watch()` | `init` 不占 clientset;`setRESTConfig` 后由 `EnsureWatchStarted()`(`sync.Once`)**延迟启动** | +| **`watch` 方法** | 完整实现(factory、事件、`WaitForCacheSync` 等) | 空实现;逻辑在 `EnsureWatchStarted` 内 | +| **索引** | `getIdxRules(resourceType)`(如 Host IP 等) | `generateCommonKey` 等 CR 侧规则 | +| **体积优化** | 按资源类型的 `preProcess` | 如 `trimCRObjectForCache`(裁剪 `spec`、`managedFields` 等) | +| **与 `MetaManager.Init` 顺序 init** | 各内置 cache 的 `watch` 参与**顺序**初始化链 | `init` 轻量;避免 GVR / REST 未就绪即起 Informer | + +### 子项字段 + +| 字段 | 类型 | 说明 | +| - | - | - | +| EntityType | string,**必填** | 内部缓存与事件类型键,用于 `__entity_type__`、实体 ID 及链路类型(如 `pod->`)。须在多条 CR 配置间唯一。 | +| APIGroup | string,必填 | CRD 的 API 组,如 `argoproj.io`。 | +| APIVersion | string,必填 | 版本,如 `v1alpha1`。 | +| Resource | string,必填 | **复数**资源名,如 `workflows`(与 `kubectl api-resources` / CRD `spec.names.plural` 一致)。 | +| Kind | string,必填 | Kubernetes **Kind**,如 `Workflow`;用于 `ownerReferences` 匹配及实体日志中的 `kind` 等。 | +| CollectEntity | bool | 为 true 时投递该 CR 的实体(entity)日志。 | +| PodLink | object,可选 | 配置后,在开启 **Pod** 的前提下可生成 **Pod→CR** 链路;需同时配置 **`Entity2PodRelation`**。 | +| Entity2PodRelation | string,可选 | entity_link 中 **`__relation_type__`**(CR 与 Pod 之间的业务关系名);与 `PodLink` 同时非空时生效。 | +| Namespace2EntityRelation | string,可选 | entity_link 中 **`__relation_type__`**(Namespace 与该 CR);需 **`CollectEntity: true`**、顶层 **`Namespace: true`** 且本字段非空;仅**有命名空间**的 CR 会生成(集群级 CR 跳过)。 | +| EnableLabels | bool | 为 true 时导出**全部** labels;不受顶层 `EnableLabels` 影响。默认不导出。 | +| EnableAnnotations | bool | 为 true 时导出**全部** annotations;不受顶层 `EnableAnnotations` 影响。默认不导出。 | + +**PodLink** 子字段: + +| 字段 | 说明 | +| - | - | +| OwnerKind | 匹配 Pod `ownerReferences[].Kind`,默认可与条目的 `Kind` 一致。 | +| OwnerAPIGroupContains | 与 `ownerReferences[].apiVersion` 做子串匹配;空则默认使用条目的 `APIGroup`。 | +| PodLabelKey | 无匹配 owner 时的回退标签,如 Argo 的 `workflows.argoproj.io/workflow`。 | + +### 配置示例:Argo Workflow + +```yaml +enable: true +inputs: + - Type: service_kubernetes_meta + Interval: 600 + Node: true + Pod: true + # Third-party CRs: configure each GVR (and optional PodLink / Entity2PodRelation) under CustomResources. + CustomResources: + - APIGroup: argoproj.io # API-Group + APIVersion: v1alpha1 # API version + Resource: workflows # 资源类型 + Kind: Workflow # kind信息 + EntityType: argo.workflow #实体类型 + CollectEntity: true + Entity2PodRelation: "contains" + Namespace2EntityRelation: "contains" + PodLink: # 和Pod的关联关系提取 + OwnerKind: Workflow # Pod OwnerReference 对应的资源Kind + OwnerAPIGroupContains: argoproj.io # Pod OwnerReference 对应的 API-Group + PodLabelKey: workflows.argoproj.io/workflow # 兜底逻辑,从label中提取关联关系 + EnableLabels: true # CR粒度配置是否上报标签 + EnableAnnotations: true # CR粒度配置是否上报注释 +``` + +同时请确保 **RBAC** 已授予对 `argoproj.io` / `workflows` 的 `get、list、watch`(见上文「Kubernetes RBAC 权限」)。 + ## 环境变量 diff --git a/go.mod b/go.mod index 37a6b8e0f6..4303cf26bb 100644 --- a/go.mod +++ b/go.mod @@ -70,6 +70,7 @@ require ( k8s.io/api v0.32.1 k8s.io/apimachinery v0.32.1 k8s.io/client-go v0.32.1 + sigs.k8s.io/controller-runtime v0.12.1 ) require ( @@ -281,7 +282,6 @@ require ( k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect launchpad.net/gocheck v0.0.0-20140225173054-000000000087 // indirect - sigs.k8s.io/controller-runtime v0.12.1 // indirect sigs.k8s.io/gateway-api v0.6.2 // indirect sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect diff --git a/pkg/helper/k8smeta/k8s_meta_cache.go b/pkg/helper/k8smeta/k8s_meta_cache.go index 6ff2ad926f..e5cd8484d6 100644 --- a/pkg/helper/k8smeta/k8s_meta_cache.go +++ b/pkg/helper/k8smeta/k8s_meta_cache.go @@ -34,6 +34,8 @@ type k8sMetaCache struct { resourceType string schema *runtime.Scheme + + giveUp *informerGiveUp } func newK8sMetaCache(stopCh chan struct{}, resourceType string) *k8sMetaCache { @@ -44,6 +46,7 @@ func newK8sMetaCache(stopCh chan struct{}, resourceType string) *k8sMetaCache { m.metaStore = NewDeferredDeletionMetaStore(m.eventCh, m.stopCh, 120, cache.MetaNamespaceKeyFunc, idxRules...) m.resourceType = resourceType m.schema = runtime.NewScheme() + m.giveUp = newInformerGiveUp() _ = v1.AddToScheme(m.schema) _ = batch.AddToScheme(m.schema) _ = batchv1beta1.AddToScheme(m.schema) @@ -90,11 +93,19 @@ func (m *k8sMetaCache) UnRegisterSendFunc(key string) { } func (m *k8sMetaCache) watch(stopCh <-chan struct{}) { + _ = stopCh // MetaCache uses m.stopCh for global shutdown; parameter kept for interface compatibility. defer panicRecover() factory, informer := m.getFactoryInformer() if informer == nil { return } + mergedStop := m.giveUp.mergedStop(m.stopCh) + if err := attachWatchErrorHandler(informer, m.giveUp, watchErrorHandlerOpts{ + ResourceType: m.resourceType, + GiveUpStopMsg: "stopping informer after repeated errors (RBAC/auth or missing API resource; no further retries)", + }); err != nil { + logger.Error(context.Background(), K8sMetaUnifyErrorCode, "fail to set watch error handler", err) + } _, _ = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { defer panicRecover() @@ -137,16 +148,8 @@ func (m *k8sMetaCache) watch(stopCh <-chan struct{}) { metaManager.deleteEventCount.Add(1) }, }) - go factory.Start(stopCh) - // wait infinite for first cache sync success - for { - if !cache.WaitForCacheSync(stopCh, informer.HasSynced) { - logger.Error(context.Background(), K8sMetaUnifyErrorCode, "service cache sync timeout") - time.Sleep(1 * time.Second) - } else { - break - } - } + go factory.Start(mergedStop) + waitInformerCacheSync(mergedStop, informer.HasSynced, informerCacheSyncOpts{ResourceType: m.resourceType}) } func (m *k8sMetaCache) getFactoryInformer() (informers.SharedInformerFactory, cache.SharedIndexInformer) { @@ -198,15 +201,6 @@ func (m *k8sMetaCache) getFactoryInformer() (informers.SharedInformerFactory, ca if informer == nil { return factory, nil } - // add watch error handler - err := informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) { - if err != nil { - logger.Error(context.Background(), K8sMetaUnifyErrorCode, "resourceType", m.resourceType, "watchError", err) - } - }) - if err != nil { - logger.Error(context.Background(), K8sMetaUnifyErrorCode, "fail to handle watch error handler", err) - } return factory, informer } @@ -449,6 +443,7 @@ func containsResource(resources []metav1.APIResource, name string) bool { } return false } + func generateNodeKey(obj interface{}) ([]string, error) { node, err := meta.Accessor(obj) if err != nil { diff --git a/pkg/helper/k8smeta/k8s_meta_const.go b/pkg/helper/k8smeta/k8s_meta_const.go index 61d38f9fe8..8f93ed0723 100644 --- a/pkg/helper/k8smeta/k8s_meta_const.go +++ b/pkg/helper/k8smeta/k8s_meta_const.go @@ -5,6 +5,7 @@ import ( batch "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" networking "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) const ( @@ -27,7 +28,6 @@ const ( STORAGECLASS = "storageclass" INGRESS = "ingress" CONTAINER = "container" - // entity link type, the direction is from resource which will be trigger to linked resource //revive:disable:var-naming LINK_SPLIT_CHARACTER = "->" POD_NODE = "pod->node" @@ -202,6 +202,18 @@ type IngressNamespace struct { Namespace *v1.Namespace } +// PodCustomResource links a Pod to an arbitrary CR stored as unstructured. +type PodCustomResource struct { + Pod *v1.Pod + CR *unstructured.Unstructured +} + +// NamespaceCustomResource links a Namespace to a namespaced CR (unstructured). +type NamespaceCustomResource struct { + Namespace *v1.Namespace + CR *unstructured.Unstructured +} + const ( EventTypeAdd = "add" EventTypeUpdate = "update" diff --git a/pkg/helper/k8smeta/k8s_meta_cr_unified_cache.go b/pkg/helper/k8smeta/k8s_meta_cr_unified_cache.go new file mode 100644 index 0000000000..46ccf35a7c --- /dev/null +++ b/pkg/helper/k8smeta/k8s_meta_cr_unified_cache.go @@ -0,0 +1,304 @@ +package k8smeta + +import ( + "context" + "fmt" + "sync" + "time" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + + "github.com/alibaba/ilogtail/pkg/logger" +) + +// crUnifiedCache is a MetaCache for one third-party API resource (dynamic informer + unstructured objects). +type crUnifiedCache struct { + metaStore *DeferredDeletionMetaStore + eventCh chan *K8sMetaEvent + stopCh chan struct{} + + resourceType string + gvr schema.GroupVersionResource + + mu sync.Mutex + dynamicClient dynamic.Interface + discoveryClient discovery.DiscoveryInterface + informer cache.SharedIndexInformer + factory dynamicinformer.DynamicSharedInformerFactory + watchStarted bool + watchStartOnce sync.Once + + giveUp *informerGiveUp +} + +func newCRUnifiedCache(stopCh chan struct{}, resourceType string, gvr schema.GroupVersionResource) *crUnifiedCache { + c := &crUnifiedCache{ + stopCh: stopCh, + resourceType: resourceType, + gvr: gvr, + eventCh: make(chan *K8sMetaEvent, 100), + } + c.metaStore = NewDeferredDeletionMetaStore(c.eventCh, stopCh, 120, cache.MetaNamespaceKeyFunc, generateCommonKey) + c.giveUp = newInformerGiveUp() + return c +} + +func (c *crUnifiedCache) init(_ *kubernetes.Clientset) { + // Built-in clientset unused; dynamic client is wired via setRESTConfig from MetaManager.Init. +} + +// SetGVRIfNotStarted updates the informer GVR before the dynamic informer starts; later calls are ignored. +func (c *crUnifiedCache) SetGVRIfNotStarted(gvr schema.GroupVersionResource) { + c.mu.Lock() + defer c.mu.Unlock() + if c.watchStarted { + logger.Warning(context.Background(), K8sMetaUnifyErrorCode, "msg", "custom resource informer already started; GVR change ignored", "gvr", gvr.String()) + return + } + c.gvr = gvr +} + +func restConfigForDynamicClient(cfg *rest.Config) *rest.Config { + if cfg == nil { + return nil + } + d := *cfg + // Dynamic client + unstructured ListWatch expect JSON; shared *rest.Config uses protobuf for clientset. + d.ContentType = runtime.ContentTypeJSON + d.AcceptContentTypes = runtime.ContentTypeJSON + return &d +} + +func (c *crUnifiedCache) setRESTConfig(cfg *rest.Config) error { + if cfg == nil { + return fmt.Errorf("nil rest.Config") + } + c.mu.Lock() + defer c.mu.Unlock() + dyn, err := dynamic.NewForConfig(restConfigForDynamicClient(cfg)) + if err != nil { + return err + } + c.dynamicClient = dyn + disco, derr := discovery.NewDiscoveryClientForConfig(restConfigForDynamicClient(cfg)) + if derr != nil { + logger.Warning(context.Background(), K8sMetaUnifyErrorCode, "msg", "discovery client for custom resource informer unavailable; will not pre-check GVR", "resourceType", c.resourceType, "error", derr) + c.discoveryClient = nil + } else { + c.discoveryClient = disco + } + return nil +} + +// gvrDiscoveryAvailable checks discovery for a CRD/plural GVR before starting a dynamic informer +// (same idea as getIngressInformer probing ServerResourcesForGroupVersion in k8s_meta_cache.go). +// +// By design: +// - when discovery reports this GVR unavailable at startup (CRD not installed yet, or plural mismatch), +// this process does not auto-retry to start the informer later; +// - operator should fix CRD/Resource and restart LoongCollector to enable this CR watcher. +func gvrDiscoveryAvailable(d discovery.DiscoveryInterface, gvr schema.GroupVersionResource) bool { + if d == nil { + return true + } + gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version}.String() + resourceList, err := d.ServerResourcesForGroupVersion(gv) + if err != nil { + logger.Warning(context.Background(), K8sMetaUnifyErrorCode, "msg", + "custom resource API group/version not available on server; skipping informer", "gvr", gvr.String(), "error", err) + return false + } + if !containsResource(resourceList.APIResources, gvr.Resource) { + logger.Warning(context.Background(), K8sMetaUnifyErrorCode, "msg", + "custom resource plural not listed for group/version; skipping informer", "gvr", gvr.String()) + return false + } + return true +} + +// EnsureWatchStarted starts the dynamic informer (once) when the dynamic client is ready. +// Important: never enter sync.Once when dynamicClient is nil. +func (c *crUnifiedCache) EnsureWatchStarted() { + c.mu.Lock() + dyn := c.dynamicClient + c.mu.Unlock() + if dyn == nil { + logger.Warning(context.Background(), K8sMetaUnifyErrorCode, "msg", "dynamic client not ready, skip custom resource informer; ensure MetaManager.Init completed") + return + } + c.watchStartOnce.Do(func() { + c.mu.Lock() + c.metaStore.Start() + gvr := c.gvr + if !gvrDiscoveryAvailable(c.discoveryClient, gvr) { + // By design: discovery failure is treated as terminal for this process lifetime. + // We intentionally mark watch as started to avoid repeated start attempts and log storms. + // After CRD/resource is fixed, restart LoongCollector to recover this CR informer. + c.watchStarted = true + c.mu.Unlock() + return + } + c.factory = dynamicinformer.NewDynamicSharedInformerFactory(dyn, time.Hour) + c.informer = c.factory.ForResource(c.gvr).Informer() + _, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + defer panicRecover() + u := trimmedCRCopyFromInformer(obj, c.resourceType) + if u == nil { + return + } + now := time.Now().Unix() + c.eventCh <- &K8sMetaEvent{ + EventType: EventTypeAdd, + Object: &ObjectWrapper{ + ResourceType: c.resourceType, + Raw: u, + FirstObservedTime: now, + LastObservedTime: now, + }, + } + metaManager.addEventCount.Add(1) + }, + UpdateFunc: func(_, obj interface{}) { + defer panicRecover() + u := trimmedCRCopyFromInformer(obj, c.resourceType) + if u == nil { + return + } + now := time.Now().Unix() + c.eventCh <- &K8sMetaEvent{ + EventType: EventTypeUpdate, + Object: &ObjectWrapper{ + ResourceType: c.resourceType, + Raw: u, + FirstObservedTime: now, + LastObservedTime: now, + }, + } + metaManager.updateEventCount.Add(1) + }, + DeleteFunc: func(obj interface{}) { + defer panicRecover() + u := trimmedCRCopyFromInformer(obj, c.resourceType) + if u == nil { + return + } + c.eventCh <- &K8sMetaEvent{ + EventType: EventTypeDelete, + Object: &ObjectWrapper{ + ResourceType: c.resourceType, + Raw: u, + LastObservedTime: time.Now().Unix(), + }, + } + metaManager.deleteEventCount.Add(1) + }, + }) + if err != nil { + logger.Error(context.Background(), K8sMetaUnifyErrorCode, "msg", "fail to add dynamic informer event handler", "error", err, "resourceType", c.resourceType, "gvr", c.gvr.String()) + } + if err := attachWatchErrorHandler(c.informer, c.giveUp, watchErrorHandlerOpts{ + ResourceType: c.resourceType, + GVR: c.gvr.String(), + GiveUpStopMsg: "stopping dynamic informer after repeated errors (RBAC/auth or missing API resource; no further retries)", + }); err != nil { + logger.Error(context.Background(), K8sMetaUnifyErrorCode, "msg", "fail to set dynamic informer watch error handler", "error", err) + } + c.watchStarted = true + inf := c.informer + c.mu.Unlock() + + mergedStop := c.giveUp.mergedStop(c.stopCh) + go c.factory.Start(mergedStop) + gvrStr := gvr.String() + go waitInformerCacheSync(mergedStop, inf.HasSynced, informerCacheSyncOpts{ResourceType: c.resourceType, GVR: gvrStr}) + }) +} + +func (c *crUnifiedCache) watch(<-chan struct{}) {} + +func (c *crUnifiedCache) Get(key []string) map[string][]*ObjectWrapper { + return c.metaStore.Get(key) +} + +func (c *crUnifiedCache) GetSize() int { + return len(c.metaStore.Items) +} + +func (c *crUnifiedCache) GetQueueSize() int { + return len(c.eventCh) +} + +func (c *crUnifiedCache) List() []*ObjectWrapper { + return c.metaStore.List() +} + +func (c *crUnifiedCache) Filter(filterFunc func(*ObjectWrapper) bool, limit int) []*ObjectWrapper { + return c.metaStore.Filter(filterFunc, limit) +} + +func (c *crUnifiedCache) RegisterSendFunc(key string, sendFunc SendFunc, interval int) { + c.EnsureWatchStarted() + c.metaStore.RegisterSendFunc(key, sendFunc, interval) + logger.Debug(context.Background(), "register send func", c.resourceType) +} + +func (c *crUnifiedCache) UnRegisterSendFunc(key string) { + c.metaStore.UnRegisterSendFunc(key) +} + +// trimmedCRCopyFromInformer builds a detached object for the meta cache without full-object DeepCopy: +// copies apiVersion/kind, metadata (without managedFields), and status via NestedFieldCopy — spec is omitted. +// This avoids mutating the informer-shared *unstructured.Unstructured and skips copying large spec blobs. +func trimmedCRCopyFromInformer(obj interface{}, resourceType string) *unstructured.Unstructured { + switch t := obj.(type) { + case *unstructured.Unstructured: + return buildTrimmedCRCopy(t, resourceType) + case cache.DeletedFinalStateUnknown: + if u, ok := t.Obj.(*unstructured.Unstructured); ok { + return buildTrimmedCRCopy(u, resourceType) + } + } + return nil +} + +func buildTrimmedCRCopy(u *unstructured.Unstructured, resourceType string) *unstructured.Unstructured { + if u == nil { + return nil + } + out := &unstructured.Unstructured{Object: make(map[string]interface{})} + if gv := u.GetAPIVersion(); gv != "" { + out.SetAPIVersion(gv) + } + if k := u.GetKind(); k != "" { + out.SetKind(k) + } + metaVal, metaFound, metaErr := unstructured.NestedFieldCopy(u.Object, "metadata") + if metaErr != nil { + logger.Debug(context.Background(), K8sMetaUnifyErrorCode, "nested copy metadata for CR cache", metaErr, "resourceType", resourceType) + } else if metaFound { + if metaMap, ok := metaVal.(map[string]interface{}); ok { + delete(metaMap, "managedFields") + if err := unstructured.SetNestedMap(out.Object, metaMap, "metadata"); err != nil { + logger.Debug(context.Background(), K8sMetaUnifyErrorCode, "set metadata on trimmed CR", err, "resourceType", resourceType) + } + } + } + statusVal, statusFound, statusErr := unstructured.NestedFieldCopy(u.Object, "status") + if statusErr != nil { + logger.Debug(context.Background(), K8sMetaUnifyErrorCode, "nested copy status for CR cache", statusErr, "resourceType", resourceType) + } else if statusFound { + if err := unstructured.SetNestedField(out.Object, statusVal, "status"); err != nil { + logger.Debug(context.Background(), K8sMetaUnifyErrorCode, "set status on trimmed CR", err, "resourceType", resourceType) + } + } + return out +} diff --git a/pkg/helper/k8smeta/k8s_meta_custom_resource.go b/pkg/helper/k8smeta/k8s_meta_custom_resource.go new file mode 100644 index 0000000000..32ec4f1367 --- /dev/null +++ b/pkg/helper/k8smeta/k8s_meta_custom_resource.go @@ -0,0 +1,102 @@ +package k8smeta + +import ( + "fmt" + "strings" + + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// CustomResourceCollectorConfig describes one third-party API resource collected via a dynamic informer. +// Use MetaManager.RegisterCustomResourceCollector after GetMetaManagerInstance and before or after Init +// (late registration will receive the REST config stored at Init). +// +// YAML-friendly field names match JSON tags when used in pipeline configs. +type CustomResourceCollectorConfig struct { + // EntityType is required: internal cache key and K8sMetaEvent.ResourceType (e.g. customresource/argoproj.io/workflow). + // It drives __entity_type__, __entity_id__, and pod->{EntityType} links — set explicitly in pipeline config. + EntityType string `json:"EntityType,omitempty"` + + APIGroup string `json:"APIGroup,omitempty"` + APIVersion string `json:"APIVersion,omitempty"` + Resource string `json:"Resource,omitempty"` // plural resource name + Kind string `json:"Kind,omitempty"` // Kubernetes kind, for ownerReferences matching and export + + // PodLink, if set, registers a Pod → this CR link generator (link type: PodLinkTypeForEntity(EntityType)). + PodLink *PodToCustomResourceLinkConfig `json:"PodLink,omitempty"` + // CollectEntity registers entity collection (K8sMetaEvent stream) for this CR. + CollectEntity bool `json:"CollectEntity,omitempty"` + // Entity2PodRelation is __relation_type__ on entity_link logs (custom resource → Pod). Required when Pod link export is enabled together with PodLink. + Entity2PodRelation string `json:"Entity2PodRelation,omitempty"` + // Namespace2EntityRelation is __relation_type__ on entity_link logs (Namespace → this namespaced CR). Export when CollectEntity, Namespace input, and this string are all set. Cluster-scoped CRs are skipped. + Namespace2EntityRelation string `json:"Namespace2EntityRelation,omitempty"` + + // EnableLabels, if true, exports full labels on entity logs. Ignores ServiceK8sMeta.EnableLabels. Default false. + EnableLabels bool `json:"EnableLabels,omitempty"` + // EnableAnnotations, if true, exports full annotations on entity logs. Ignores ServiceK8sMeta.EnableAnnotations. Default false. + EnableAnnotations bool `json:"EnableAnnotations,omitempty"` +} + +// PodToCustomResourceLinkConfig resolves which Workflow-like object a Pod belongs to. +type PodToCustomResourceLinkConfig struct { + // OwnerKind matches Pod ownerReferences[].Kind (e.g. Workflow). + OwnerKind string `json:"OwnerKind,omitempty"` + // OwnerAPIGroupContains is matched as substring of ownerReferences[].APIVersion (empty => use collector APIGroup). + OwnerAPIGroupContains string `json:"OwnerAPIGroupContains,omitempty"` + // PodLabelKey fallback when no matching ownerRef (e.g. workflows.argoproj.io/workflow). + PodLabelKey string `json:"PodLabelKey,omitempty"` +} + +// PodLinkTypeForEntity returns the link ResourceType for RegisterSendFunc (e.g. pod->customresource/...). +func PodLinkTypeForEntity(entityType string) string { + return POD + LINK_SPLIT_CHARACTER + entityType +} + +// NamespaceLinkTypeForEntity is the link ResourceType for Namespace → namespaced CR (e.g. argo.workflow->namespace). +func NamespaceLinkTypeForEntity(entityType string) string { + return entityType + LINK_SPLIT_CHARACTER + NAMESPACE +} + +// DefaultEntityType returns the conventional type string customresource//. +// It does not apply automatically; EntityType must still be set on the config (Normalize requires it). +func DefaultEntityType(apiGroup, kind string) string { + return fmt.Sprintf("customresource/%s/%s", strings.ToLower(strings.TrimSpace(apiGroup)), strings.ToLower(strings.TrimSpace(kind))) +} + +// ToGVR returns the GroupVersionResource for the dynamic informer. +func (c *CustomResourceCollectorConfig) ToGVR() schema.GroupVersionResource { + return schema.GroupVersionResource{ + Group: c.APIGroup, + Version: c.APIVersion, + Resource: c.Resource, + } +} + +// Normalize validates and fills PodLink defaults. EntityType must be non-empty. Call before RegisterCustomResourceCollector. +func (c *CustomResourceCollectorConfig) Normalize() error { + c.APIGroup = strings.TrimSpace(c.APIGroup) + c.APIVersion = strings.TrimSpace(c.APIVersion) + c.Resource = strings.TrimSpace(c.Resource) + c.Kind = strings.TrimSpace(c.Kind) + c.EntityType = strings.TrimSpace(c.EntityType) + c.Namespace2EntityRelation = strings.TrimSpace(c.Namespace2EntityRelation) + + if c.APIGroup == "" || c.APIVersion == "" || c.Resource == "" || c.Kind == "" { + return fmt.Errorf("custom resource collector: APIGroup, APIVersion, Resource, and Kind are required") + } + if c.EntityType == "" { + return fmt.Errorf("custom resource collector: EntityType is required") + } + if pl := c.PodLink; pl != nil { + pl.OwnerKind = strings.TrimSpace(pl.OwnerKind) + pl.OwnerAPIGroupContains = strings.TrimSpace(pl.OwnerAPIGroupContains) + pl.PodLabelKey = strings.TrimSpace(pl.PodLabelKey) + if pl.OwnerKind == "" { + pl.OwnerKind = c.Kind + } + if pl.OwnerAPIGroupContains == "" { + pl.OwnerAPIGroupContains = c.APIGroup + } + } + return nil +} diff --git a/pkg/helper/k8smeta/k8s_meta_informer_giveup.go b/pkg/helper/k8smeta/k8s_meta_informer_giveup.go new file mode 100644 index 0000000000..0a31e1b032 --- /dev/null +++ b/pkg/helper/k8smeta/k8s_meta_informer_giveup.go @@ -0,0 +1,50 @@ +package k8smeta + +import ( + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// informerGiveUpFailureThreshold is how many consecutive Reflector ListAndWatch errors +// of a “give up” class (RBAC, missing API resource, etc.) trigger stopping the informer factory. +const informerGiveUpFailureThreshold = 3 + +func isInformerAuthFailure(err error) bool { + if err == nil { + return false + } + if apierrors.IsForbidden(err) || apierrors.IsUnauthorized(err) { + return true + } + switch apierrors.ReasonForError(err) { + case metav1.StatusReasonForbidden, metav1.StatusReasonUnauthorized: + return true + default: + return false + } +} + +// isInformerPermanentResourceFailure matches Reflector ListAndWatch errors that will not +// recover without cluster changes (e.g. CRD not installed: "the server could not find the requested resource"). +func isInformerPermanentResourceFailure(err error) bool { + if err == nil { + return false + } + if apierrors.IsNotFound(err) { + return true + } + if meta.IsNoMatchError(err) { + return true + } + switch apierrors.ReasonForError(err) { + case metav1.StatusReasonNotFound: + return true + default: + return false + } +} + +func isInformerGiveUpFailure(err error) bool { + return isInformerAuthFailure(err) || isInformerPermanentResourceFailure(err) +} diff --git a/pkg/helper/k8smeta/k8s_meta_informer_lifecycle.go b/pkg/helper/k8smeta/k8s_meta_informer_lifecycle.go new file mode 100644 index 0000000000..4b511f6910 --- /dev/null +++ b/pkg/helper/k8smeta/k8s_meta_informer_lifecycle.go @@ -0,0 +1,136 @@ +// Copyright 2021 iLogtail Authors +// +// Shared informer lifecycle helpers: merge global stop with give-up channel, +// Reflector watch-error give-up counting, and cache sync with exponential backoff. + +package k8smeta + +import ( + "context" + "sync" + "time" + + "k8s.io/client-go/tools/cache" + + "github.com/alibaba/ilogtail/pkg/logger" +) + +// informerGiveUp holds state for merging shutdown with “give up” after repeated +// non-recoverable Reflector errors (see informerGiveUpFailureThreshold). +type informerGiveUp struct { + mu sync.Mutex + count int + ch chan struct{} + once sync.Once +} + +func newInformerGiveUp() *informerGiveUp { + return &informerGiveUp{ch: make(chan struct{})} +} + +// mergedStop returns a channel that closes when either globalStop or give-up fires. +func (g *informerGiveUp) mergedStop(globalStop <-chan struct{}) chan struct{} { + merged := make(chan struct{}) + go func() { + select { + case <-globalStop: + case <-g.ch: + } + close(merged) + }() + return merged +} + +// watchErrorHandlerOpts configures logging for attachWatchErrorHandler. +type watchErrorHandlerOpts struct { + ResourceType string + GVR string // optional; when non-empty, included in Error and Warning logs + GiveUpStopMsg string // Warning message body on give-up (first kv key in logger.Warning) +} + +// attachWatchErrorHandler registers SetWatchErrorHandler with give-up counting. +func attachWatchErrorHandler(informer cache.SharedIndexInformer, g *informerGiveUp, o watchErrorHandlerOpts) error { + return informer.SetWatchErrorHandler(func(_ *cache.Reflector, err error) { + if err == nil { + return + } + kvs := []interface{}{"resourceType", o.ResourceType, "watchError", err} + if o.GVR != "" { + kvs = append(kvs, "gvr", o.GVR) + } + logger.Error(context.Background(), K8sMetaUnifyErrorCode, kvs...) + if !isInformerGiveUpFailure(err) { + return + } + g.mu.Lock() + g.count++ + n := g.count + g.mu.Unlock() + if n < informerGiveUpFailureThreshold { + return + } + g.once.Do(func() { + var wkvs []interface{} + if o.GVR != "" { + wkvs = []interface{}{ + "msg", o.GiveUpStopMsg, + "resourceType", o.ResourceType, + "gvr", o.GVR, + "failures", n, + } + } else { + wkvs = []interface{}{ + "msg", o.GiveUpStopMsg, + "resourceType", o.ResourceType, + "failures", n, + } + } + logger.Warning(context.Background(), K8sMetaUnifyErrorCode, wkvs...) + close(g.ch) + }) + }) +} + +// informerCacheSyncOpts configures logging for waitInformerCacheSync. +type informerCacheSyncOpts struct { + ResourceType string + // GVR non-empty selects CR-style log lines (dynamic informer + gvr) and logs success on sync. + GVR string +} + +// waitInformerCacheSync blocks until hasSynced or mergedStop is closed, with exponential backoff +// between WaitForCacheSync polls (same as previous k8sMetaCache.watch / crUnifiedCache loops). +func waitInformerCacheSync(mergedStop <-chan struct{}, hasSynced cache.InformerSynced, o informerCacheSyncOpts) { + backoff := time.Second + const maxBackoff = 10 * time.Second + for { + if cache.WaitForCacheSync(mergedStop, hasSynced) { + if o.GVR != "" { + logger.Info(context.Background(), "msg", "dynamic informer cache synced", "gvr", o.GVR) + } + return + } + select { + case <-mergedStop: + if o.GVR != "" { + logger.Warning(context.Background(), K8sMetaUnifyErrorCode, "msg", "dynamic informer cache sync aborted", "gvr", o.GVR) + } else { + logger.Warning(context.Background(), K8sMetaUnifyErrorCode, "msg", "informer cache sync aborted", "resourceType", o.ResourceType) + } + return + default: + } + if o.GVR != "" { + logger.Error(context.Background(), K8sMetaUnifyErrorCode, "dynamic informer cache sync timeout", "gvr", o.GVR, "nextRetryIn", backoff.String()) + } else { + logger.Error(context.Background(), K8sMetaUnifyErrorCode, "service cache sync timeout", "resourceType", o.ResourceType, "nextRetryIn", backoff.String()) + } + time.Sleep(backoff) + if backoff < maxBackoff { + backoff *= 2 + if backoff > maxBackoff { + backoff = maxBackoff + } + } + } +} diff --git a/pkg/helper/k8smeta/k8s_meta_link.go b/pkg/helper/k8smeta/k8s_meta_link.go index b533f724d5..72605eb20b 100644 --- a/pkg/helper/k8smeta/k8s_meta_link.go +++ b/pkg/helper/k8smeta/k8s_meta_link.go @@ -2,22 +2,59 @@ package k8smeta import ( "strings" + "sync" app "k8s.io/api/apps/v1" batch "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" networking "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" ) type LinkGenerator struct { - metaCache map[string]MetaCache + metaCache map[string]MetaCache + metaCacheMu *sync.RWMutex // same mutex as MetaManager.cacheMu; guards map structure for RegisterCustomResourceCollector + + podCRMu sync.RWMutex + podCRByLinkType map[string]*podCRLinkRuntime +} + +type podCRLinkRuntime struct { + entityType string + ownerKind string + ownerAPIGroupSubstr string + podLabelKey string } -func NewK8sMetaLinkGenerator(metaCache map[string]MetaCache) *LinkGenerator { +func NewK8sMetaLinkGenerator(metaCache map[string]MetaCache, metaCacheMu *sync.RWMutex) *LinkGenerator { return &LinkGenerator{ - metaCache: metaCache, + metaCache: metaCache, + metaCacheMu: metaCacheMu, + podCRByLinkType: make(map[string]*podCRLinkRuntime), + } +} + +func (g *LinkGenerator) registerPodCRLink(linkType string, rt *podCRLinkRuntime) { + if g == nil || linkType == "" || rt == nil { + return + } + g.podCRMu.Lock() + defer g.podCRMu.Unlock() + if g.podCRByLinkType == nil { + g.podCRByLinkType = make(map[string]*podCRLinkRuntime) + } + g.podCRByLinkType[linkType] = rt +} + +func (g *LinkGenerator) podCRRuntimeForLinkType(linkType string) (*podCRLinkRuntime, bool) { + if g == nil { + return nil, false } + g.podCRMu.RLock() + defer g.podCRMu.RUnlock() + rt, ok := g.podCRByLinkType[linkType] + return rt, ok } func (g *LinkGenerator) GenerateLinks(events []*K8sMetaEvent, linkType string) []*K8sMetaEvent { @@ -29,6 +66,16 @@ func (g *LinkGenerator) GenerateLinks(events []*K8sMetaEvent, linkType string) [ if !strings.HasPrefix(linkType, resourceType) { return nil } + if g.metaCacheMu != nil { + g.metaCacheMu.RLock() + defer g.metaCacheMu.RUnlock() + } + // CustomResource links (third-party CR): + // 1) Pod→CR when linkType is registered via PodLink (before built-in switch). + // 2) namespaced CR→Namespace when linkType is "->namespace" (after switch, so built-in *->namespace kinds stay in cases above). + if rt, ok := g.podCRRuntimeForLinkType(linkType); ok { + return g.getPodCustomResourceLink(events, rt, linkType) + } switch linkType { case POD_NODE: return g.getPodNodeLink(events) @@ -76,9 +123,13 @@ func (g *LinkGenerator) GenerateLinks(events []*K8sMetaEvent, linkType string) [ return g.getPVCNamespaceLink(events) case INGRESS_NAMESPACE: return g.getIngressNamespaceLink(events) - default: - return nil } + // CustomResource links (third-party CR): + // 2) namespaced CR→Namespace when linkType is "->namespace" (after switch, so built-in *->namespace kinds stay in cases above). + if strings.HasSuffix(linkType, LINK_SPLIT_CHARACTER+NAMESPACE) { + return g.getCustomResourceNamespaceLink(events) + } + return nil } func (g *LinkGenerator) getPodNodeLink(podList []*K8sMetaEvent) []*K8sMetaEvent { @@ -565,6 +616,108 @@ func (g *LinkGenerator) getIngressServiceLink(ingressList []*K8sMetaEvent) []*K8 return result } +func (g *LinkGenerator) crNameFromPod(pod *v1.Pod, rt *podCRLinkRuntime) (namespace, name string, ok bool) { + if pod == nil || rt == nil { + return "", "", false + } + for _, ref := range pod.OwnerReferences { + if ref.Kind == rt.ownerKind && strings.Contains(ref.APIVersion, rt.ownerAPIGroupSubstr) { + return pod.Namespace, ref.Name, true + } + } + if rt.podLabelKey != "" && pod.Labels != nil { + if v := pod.Labels[rt.podLabelKey]; v != "" { + return pod.Namespace, v, true + } + } + return "", "", false +} + +func (g *LinkGenerator) getPodCustomResourceLink(podList []*K8sMetaEvent, rt *podCRLinkRuntime, linkType string) []*K8sMetaEvent { + if rt == nil { + return nil + } + crCache := g.metaCache[rt.entityType] + if crCache == nil { + return nil + } + result := make([]*K8sMetaEvent, 0) + for _, data := range podList { + pod, ok := data.Object.Raw.(*v1.Pod) + if !ok { + continue + } + ns, resName, found := g.crNameFromPod(pod, rt) + if !found || resName == "" { + continue + } + items := crCache.Get([]string{generateNameWithNamespaceKey(ns, resName)}) + for _, group := range items { + for _, w := range group { + u, ok := w.Raw.(*unstructured.Unstructured) + if !ok { + continue + } + result = append(result, &K8sMetaEvent{ + EventType: data.EventType, + Object: &ObjectWrapper{ + ResourceType: linkType, + Raw: &PodCustomResource{ + Pod: pod, + CR: u, + }, + FirstObservedTime: data.Object.FirstObservedTime, + LastObservedTime: data.Object.LastObservedTime, + }, + }) + } + } + } + return result +} + +func (g *LinkGenerator) getCustomResourceNamespaceLink(events []*K8sMetaEvent) []*K8sMetaEvent { + if len(events) == 0 { + return nil + } + entityType := events[0].Object.ResourceType + nsCache := g.metaCache[NAMESPACE] + if nsCache == nil { + return nil + } + result := make([]*K8sMetaEvent, 0) + for _, data := range events { + u, ok := data.Object.Raw.(*unstructured.Unstructured) + if !ok { + continue + } + nsName := u.GetNamespace() + if nsName == "" { + continue + } + nsList := nsCache.Get([]string{generateNameWithNamespaceKey("", nsName)}) + for _, ns := range nsList { + for _, n := range ns { + if namespace, ok := n.Raw.(*v1.Namespace); ok { + result = append(result, &K8sMetaEvent{ + EventType: data.EventType, + Object: &ObjectWrapper{ + ResourceType: NamespaceLinkTypeForEntity(entityType), + Raw: &NamespaceCustomResource{ + Namespace: namespace, + CR: u, + }, + FirstObservedTime: data.Object.FirstObservedTime, + LastObservedTime: data.Object.LastObservedTime, + }, + }) + } + } + } + } + return result +} + func (g *LinkGenerator) getPodNamespaceLink(podList []*K8sMetaEvent) []*K8sMetaEvent { result := make([]*K8sMetaEvent, 0) for _, data := range podList { diff --git a/pkg/helper/k8smeta/k8s_meta_link_test.go b/pkg/helper/k8smeta/k8s_meta_link_test.go index cd490ece04..fe91ff9210 100644 --- a/pkg/helper/k8smeta/k8s_meta_link_test.go +++ b/pkg/helper/k8smeta/k8s_meta_link_test.go @@ -4,13 +4,22 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" app "k8s.io/api/apps/v1" batch "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" networking "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" ) +// testLinkGenerator returns a LinkGenerator for unit tests (nil metaCacheMu; tests do not +// concurrently mutate the shared cache map). +func testLinkGenerator(metaCache map[string]MetaCache) *LinkGenerator { + return NewK8sMetaLinkGenerator(metaCache, nil) +} + func TestGetPodNodeLink(t *testing.T) { podCache := newK8sMetaCache(make(chan struct{}), POD) nodeCache := newK8sMetaCache(make(chan struct{}), NODE) @@ -46,7 +55,7 @@ func TestGetPodNodeLink(t *testing.T) { EventType: "add", Object: pod2, }) - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ POD: podCache, NODE: nodeCache, }) @@ -162,7 +171,7 @@ func TestGetPodDeploymentLink(t *testing.T) { EventType: "add", Object: pod2, }) - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ POD: podCache, REPLICASET: replicasetCache, DEPLOYMENT: deploymentCache, @@ -256,7 +265,7 @@ func TestGetReplicaSetDeploymentLink(t *testing.T) { }, }, }) - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ REPLICASET: replicasetCache, DEPLOYMENT: deploymentCache, }) @@ -337,7 +346,7 @@ func TestGetPodReplicaSetLink(t *testing.T) { EventType: "add", Object: pod2, }) - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ POD: podCache, REPLICASET: replicasetCache, }) @@ -404,7 +413,7 @@ func TestGetPodDaemonSetLink(t *testing.T) { EventType: "add", Object: pod2, }) - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ POD: podCache, DAEMONSET: daemonsetCache, }) @@ -471,7 +480,7 @@ func TestGetPodStatefulSetLink(t *testing.T) { EventType: "add", Object: pod2, }) - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ POD: podCache, STATEFULSET: statefulsetCache, }) @@ -538,7 +547,7 @@ func TestGetPodJobLink(t *testing.T) { EventType: "add", Object: pod2, }) - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ POD: podCache, JOB: jobCache, }) @@ -619,7 +628,7 @@ func TestGetJobCronJobLink(t *testing.T) { EventType: "add", Object: job2, }) - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ JOB: jobCache, CRONJOB: cronJobCache, }) @@ -694,7 +703,7 @@ func TestGetPodPVCLink(t *testing.T) { EventType: "add", Object: pod2, }) - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ POD: podCache, PERSISTENTVOLUMECLAIM: pvcCache, }) @@ -773,7 +782,7 @@ func TestGetPodConfigMapLink(t *testing.T) { EventType: "add", Object: pod2, }) - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ POD: podCache, CONFIGMAP: configMapCache, }) @@ -844,7 +853,7 @@ func TestGetPodServiceLink(t *testing.T) { EventType: "add", Object: pod2, }) - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ POD: podCache, SERVICE: serviceCache, }) @@ -874,7 +883,7 @@ func TestGetPodContainerLink(t *testing.T) { EventType: "add", Object: generateMockPod("2"), }) - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ POD: podCache, }) podList := []*K8sMetaEvent{ @@ -988,7 +997,7 @@ func TestGetIngressServiceLink(t *testing.T) { }, }, }) - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ INGRESS: ingressCache, SERVICE: serviceCache, }) @@ -1037,7 +1046,7 @@ func TestGetPodNamespaceLink(t *testing.T) { EventType: "add", Object: pod3, }) - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ POD: podCache, NAMESPACE: namespaceCache, }) @@ -1122,7 +1131,7 @@ func TestGetServiceNamespaceLink(t *testing.T) { Object: service2, }, } - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ SERVICE: serviceCache, NAMESPACE: namespaceCache, }) @@ -1183,7 +1192,7 @@ func TestGetDeploymentNamespaceLink(t *testing.T) { Object: deployment2, }, } - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ DEPLOYMENT: deploymentCache, NAMESPACE: namespaceCache, }) @@ -1244,7 +1253,7 @@ func TestGetDaemonSetNamespaceLink(t *testing.T) { Object: daemonset2, }, } - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ DAEMONSET: daemonSetCache, NAMESPACE: namespaceCache, }) @@ -1304,7 +1313,7 @@ func TestGetStatefulSetNamespaceLink(t *testing.T) { Object: statefulSet2, }, } - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ DAEMONSET: statefulSetCache, NAMESPACE: namespaceCache, }) @@ -1364,7 +1373,7 @@ func TestGetConfigMapNamespaceLink(t *testing.T) { Object: configmap2, }, } - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ CONFIGMAP: configmapCache, NAMESPACE: namespaceCache, }) @@ -1424,7 +1433,7 @@ func TestGetJobNamespaceLink(t *testing.T) { Object: job2, }, } - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ JOB: jobCache, NAMESPACE: namespaceCache, }) @@ -1484,7 +1493,7 @@ func TestGetCronJobNamespaceLink(t *testing.T) { Object: cronjob2, }, } - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ CRONJOB: cronjobCache, NAMESPACE: namespaceCache, }) @@ -1544,7 +1553,7 @@ func TestGetPVCNamespaceLink(t *testing.T) { Object: pvc2, }, } - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ PERSISTENTVOLUMECLAIM: pvcCache, NAMESPACE: namespaceCache, }) @@ -1604,7 +1613,7 @@ func TestGetIngressNamespaceLink(t *testing.T) { Object: ingress2, }, } - linkGenerator := NewK8sMetaLinkGenerator(map[string]MetaCache{ + linkGenerator := testLinkGenerator(map[string]MetaCache{ INGRESS: ingressCache, NAMESPACE: namespaceCache, }) @@ -1647,3 +1656,195 @@ func generateMockPod(index string) *ObjectWrapper { }, } } + +func testArgoWorkflowCR(name string) *unstructured.Unstructured { + u := &unstructured.Unstructured{} + u.SetAPIVersion("argoproj.io/v1alpha1") + u.SetKind("Workflow") + u.SetNamespace("default") + u.SetName(name) + return u +} + +func testPodCRLinkRuntime() *podCRLinkRuntime { + const entityType = "argo.workflow" + return &podCRLinkRuntime{ + entityType: entityType, + ownerKind: "Workflow", + ownerAPIGroupSubstr: "argoproj.io", + podLabelKey: "workflows.argoproj.io/workflow", + } +} + +// TestGetPodCustomResourceLinkViaOwnerReference covers Pod→CR resolution when the Pod has a matching Workflow ownerReference. +func TestGetPodCustomResourceLinkViaOwnerReference(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + entityType := "argo.workflow" + linkType := PodLinkTypeForEntity(entityType) + + podCache := newK8sMetaCache(stopCh, POD) + crCache := newCRUnifiedCache(stopCh, entityType, schema.GroupVersionResource{Group: "argoproj.io", Version: "v1alpha1", Resource: "workflows"}) + crCache.metaStore.handleAddOrUpdateEvent(&K8sMetaEvent{ + EventType: EventTypeAdd, + Object: &ObjectWrapper{ + ResourceType: entityType, + Raw: testArgoWorkflowCR("my-wf"), + }, + }) + + podW := generateMockPod("1") + podW.ResourceType = POD + podW.Raw.(*corev1.Pod).OwnerReferences = []metav1.OwnerReference{{ + APIVersion: "argoproj.io/v1alpha1", + Kind: "Workflow", + Name: "my-wf", + UID: "uid-wf", + }} + podCache.metaStore.handleAddOrUpdateEvent(&K8sMetaEvent{EventType: EventTypeAdd, Object: podW}) + + lg := testLinkGenerator(map[string]MetaCache{ + POD: podCache, + entityType: crCache, + }) + lg.registerPodCRLink(linkType, testPodCRLinkRuntime()) + + podList := []*K8sMetaEvent{{EventType: EventTypeUpdate, Object: podCache.metaStore.Items["default/pod1"]}} + results := lg.GenerateLinks(podList, linkType) + require.Len(t, results, 1) + pcr, ok := results[0].Object.Raw.(*PodCustomResource) + require.True(t, ok) + assert.Equal(t, "my-wf", pcr.CR.GetName()) + assert.Equal(t, "pod1", pcr.Pod.Name) + assert.Equal(t, linkType, results[0].Object.ResourceType) +} + +// TestGetPodCustomResourceLinkViaLabelFallback covers Pod→CR when ownerReferences do not match but PodLabelKey is set. +func TestGetPodCustomResourceLinkViaLabelFallback(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + entityType := "argo.workflow" + linkType := PodLinkTypeForEntity(entityType) + + podCache := newK8sMetaCache(stopCh, POD) + crCache := newCRUnifiedCache(stopCh, entityType, schema.GroupVersionResource{Group: "argoproj.io", Version: "v1alpha1", Resource: "workflows"}) + crCache.metaStore.handleAddOrUpdateEvent(&K8sMetaEvent{ + EventType: EventTypeAdd, + Object: &ObjectWrapper{ + ResourceType: entityType, + Raw: testArgoWorkflowCR("wf-from-label"), + }, + }) + + podW := generateMockPod("2") + podW.ResourceType = POD + pod := podW.Raw.(*corev1.Pod) + pod.OwnerReferences = []metav1.OwnerReference{{ + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: "some-rs", + }} + pod.Labels = map[string]string{"workflows.argoproj.io/workflow": "wf-from-label"} + podCache.metaStore.handleAddOrUpdateEvent(&K8sMetaEvent{EventType: EventTypeAdd, Object: podW}) + + lg := testLinkGenerator(map[string]MetaCache{ + POD: podCache, + entityType: crCache, + }) + lg.registerPodCRLink(linkType, testPodCRLinkRuntime()) + + podList := []*K8sMetaEvent{{EventType: EventTypeUpdate, Object: podCache.metaStore.Items["default/pod2"]}} + results := lg.GenerateLinks(podList, linkType) + require.Len(t, results, 1) + pcr := results[0].Object.Raw.(*PodCustomResource) + assert.Equal(t, "wf-from-label", pcr.CR.GetName()) +} + +// TestGetCustomResourceNamespaceLink covers Namespace→namespaced CR links using the namespace cache. +func TestGetCustomResourceNamespaceLink(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + entityType := "argo.workflow" + linkType := NamespaceLinkTypeForEntity(entityType) + + nsCache := newK8sMetaCache(stopCh, NAMESPACE) + nsCache.metaStore.handleAddOrUpdateEvent(&K8sMetaEvent{ + EventType: EventTypeAdd, + Object: generateMockNamespace("default"), + }) + + wf := testArgoWorkflowCR("ns-linked-wf") + events := []*K8sMetaEvent{{ + EventType: EventTypeAdd, + Object: &ObjectWrapper{ + ResourceType: entityType, + Raw: wf, + }, + }} + + lg := testLinkGenerator(map[string]MetaCache{NAMESPACE: nsCache}) + results := lg.GenerateLinks(events, linkType) + require.Len(t, results, 1) + ncr, ok := results[0].Object.Raw.(*NamespaceCustomResource) + require.True(t, ok) + assert.Equal(t, "default", ncr.Namespace.Name) + assert.Equal(t, "ns-linked-wf", ncr.CR.GetName()) + assert.Equal(t, linkType, results[0].Object.ResourceType) +} + +// TestGetPodCustomResourceLinkMissingCRCache verifies GenerateLinks returns nil when the CR MetaCache entry is absent. +func TestGetPodCustomResourceLinkMissingCRCache(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + entityType := "argo.workflow" + linkType := PodLinkTypeForEntity(entityType) + + podCache := newK8sMetaCache(stopCh, POD) + podW := generateMockPod("3") + podW.ResourceType = POD + podW.Raw.(*corev1.Pod).OwnerReferences = []metav1.OwnerReference{{ + APIVersion: "argoproj.io/v1alpha1", + Kind: "Workflow", + Name: "ghost", + }} + podCache.metaStore.handleAddOrUpdateEvent(&K8sMetaEvent{EventType: EventTypeAdd, Object: podW}) + + // Intentionally omit entityType from metaCache (no CR cache registered). + lg := testLinkGenerator(map[string]MetaCache{POD: podCache}) + lg.registerPodCRLink(linkType, testPodCRLinkRuntime()) + + podList := []*K8sMetaEvent{{EventType: EventTypeUpdate, Object: podCache.metaStore.Items["default/pod3"]}} + results := lg.GenerateLinks(podList, linkType) + assert.Nil(t, results) +} + +// TestGetPodCustomResourceLinkCRCacheHitMiss verifies no links when the CR is not present in cache (cache exists, Get empty). +func TestGetPodCustomResourceLinkCRCacheHitMiss(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + entityType := "argo.workflow" + linkType := PodLinkTypeForEntity(entityType) + + podCache := newK8sMetaCache(stopCh, POD) + crCache := newCRUnifiedCache(stopCh, entityType, schema.GroupVersionResource{Group: "argoproj.io", Version: "v1alpha1", Resource: "workflows"}) + // CR cache empty: Pod points to a workflow name not in cache. + + podW := generateMockPod("4") + podW.ResourceType = POD + podW.Raw.(*corev1.Pod).OwnerReferences = []metav1.OwnerReference{{ + APIVersion: "argoproj.io/v1alpha1", + Kind: "Workflow", + Name: "not-in-cache", + }} + podCache.metaStore.handleAddOrUpdateEvent(&K8sMetaEvent{EventType: EventTypeAdd, Object: podW}) + + lg := testLinkGenerator(map[string]MetaCache{ + POD: podCache, + entityType: crCache, + }) + lg.registerPodCRLink(linkType, testPodCRLinkRuntime()) + + podList := []*K8sMetaEvent{{EventType: EventTypeUpdate, Object: podCache.metaStore.Items["default/pod4"]}} + results := lg.GenerateLinks(podList, linkType) + assert.Empty(t, results) +} diff --git a/pkg/helper/k8smeta/k8s_meta_manager.go b/pkg/helper/k8smeta/k8s_meta_manager.go index 76887b5ad1..577a29c51e 100644 --- a/pkg/helper/k8smeta/k8s_meta_manager.go +++ b/pkg/helper/k8smeta/k8s_meta_manager.go @@ -40,13 +40,15 @@ type FlushCh struct { } type MetaManager struct { - clientset *kubernetes.Clientset - stopCh chan struct{} + clientset *kubernetes.Clientset + restConfig *rest.Config + stopCh chan struct{} ready atomic.Bool metadataHandler *metadataHandler cacheMap map[string]MetaCache + cacheMu sync.RWMutex linkGenerator *LinkGenerator linkRegisterMap map[string][]string registerLock sync.RWMutex @@ -74,13 +76,62 @@ func GetMetaManagerInstance() *MetaManager { for _, resource := range AllResources { metaManager.cacheMap[resource] = newK8sMetaCache(metaManager.stopCh, resource) } - metaManager.linkGenerator = NewK8sMetaLinkGenerator(metaManager.cacheMap) + metaManager.linkGenerator = NewK8sMetaLinkGenerator(metaManager.cacheMap, &metaManager.cacheMu) metaManager.linkRegisterMap = make(map[string][]string) metaManager.projectNames = make(map[string]int) }) return metaManager } +// RegisterCustomResourceCollector registers a dynamic informer cache keyed by cfg.EntityType (after Normalize). +// Optional PodLink registers Pod→CR link generation for PodLinkTypeForEntity(EntityType). +// Safe before or after Init; if Init already ran, the dynamic client is attached immediately. +func (m *MetaManager) RegisterCustomResourceCollector(cfg CustomResourceCollectorConfig) error { + if err := cfg.Normalize(); err != nil { + return err + } + m.cacheMu.Lock() + if exist, ok := m.cacheMap[cfg.EntityType]; ok { + if uc, isCR := exist.(*crUnifiedCache); isCR { + uc.SetGVRIfNotStarted(cfg.ToGVR()) + } + } else { + m.cacheMap[cfg.EntityType] = newCRUnifiedCache(m.stopCh, cfg.EntityType, cfg.ToGVR()) + if m.restConfig != nil { + if uc, ok := m.cacheMap[cfg.EntityType].(*crUnifiedCache); ok { + if err := uc.setRESTConfig(m.restConfig); err != nil { + // Graceful degradation: dynamicClient unset; crUnifiedCache.EnsureWatchStarted skips when client is nil (this CR informer does not run). + logger.Error(context.Background(), K8sMetaUnifyErrorCode, "setRESTConfig for custom resource cache", err, "entityType", cfg.EntityType) + } + } + } + } + m.cacheMu.Unlock() + + if cfg.PodLink != nil { + m.linkGenerator.registerPodCRLink(PodLinkTypeForEntity(cfg.EntityType), &podCRLinkRuntime{ + entityType: cfg.EntityType, + ownerKind: cfg.PodLink.OwnerKind, + ownerAPIGroupSubstr: firstNonEmpty(cfg.PodLink.OwnerAPIGroupContains, cfg.APIGroup), + podLabelKey: cfg.PodLink.PodLabelKey, + }) + } + return nil +} + +// EnsureCustomResourceInformerStarted starts the dynamic informer for an EntityType if the REST config is ready. +func (m *MetaManager) EnsureCustomResourceInformerStarted(entityType string) { + m.cacheMu.RLock() + c, ok := m.cacheMap[entityType] + m.cacheMu.RUnlock() + if !ok { + return + } + if uc, ok := c.(*crUnifiedCache); ok { + uc.EnsureWatchStarted() + } +} + func (m *MetaManager) Init(configPath string) (err error) { var config *rest.Config if len(configPath) > 0 { @@ -103,6 +154,19 @@ func (m *MetaManager) Init(configPath string) (err error) { return err } m.clientset = clientset + m.restConfig = config + + // CR dynamic client: setRESTConfig errors are logged only (graceful degradation; built-in meta still starts). + // Failed caches keep dynamicClient nil; EnsureWatchStarted skips there (no CR informer until a successful setRESTConfig, e.g. after restart). + m.cacheMu.Lock() + for _, c := range m.cacheMap { + if uc, ok := c.(*crUnifiedCache); ok { + if err := uc.setRESTConfig(config); err != nil { + logger.Error(context.Background(), K8sMetaUnifyErrorCode, "setRESTConfig for custom resource cache at Init", err, "resourceType", uc.resourceType) + } + } + } + m.cacheMu.Unlock() m.metricRecord = selfmonitor.MetricsRecord{} m.addEventCount = selfmonitor.NewCounterMetricAndRegister(&m.metricRecord, selfmonitor.MetricRunnerK8sMetaAddEventTotal) @@ -116,9 +180,21 @@ func (m *MetaManager) Init(configPath string) (err error) { go func() { startTime := time.Now() + m.cacheMu.RLock() + caches := make([]struct { + name string + c MetaCache + }, 0, len(m.cacheMap)) for resourceType, cache := range m.cacheMap { - logger.Info(context.Background(), resourceType, "init success") - cache.init(clientset) + caches = append(caches, struct { + name string + c MetaCache + }{resourceType, cache}) + } + m.cacheMu.RUnlock() + for _, ent := range caches { + logger.Info(context.Background(), ent.name, "init success") + ent.c.init(clientset) } m.ready.Store(true) logger.Info(context.Background(), "init k8s meta manager", "success", "latancy (ms)", fmt.Sprintf("%d", time.Since(startTime).Milliseconds())) @@ -136,7 +212,10 @@ func (m *MetaManager) IsReady() bool { } func (m *MetaManager) RegisterSendFunc(projectName, configName, resourceType string, sendFunc SendFunc, interval int) { - if cache, ok := m.cacheMap[resourceType]; ok { + m.cacheMu.RLock() + cache, ok := m.cacheMap[resourceType] + m.cacheMu.RUnlock() + if ok { cache.RegisterSendFunc(configName, func(events []*K8sMetaEvent) { defer panicRecover() sendFunc(events) @@ -174,7 +253,13 @@ func (m *MetaManager) RegisterSendFunc(projectName, configName, resourceType str } func (m *MetaManager) UnRegisterAllSendFunc(projectName, configName string) { + m.cacheMu.RLock() + caches := make([]MetaCache, 0, len(m.cacheMap)) for _, cache := range m.cacheMap { + caches = append(caches, cache) + } + m.cacheMu.RUnlock() + for _, cache := range caches { cache.UnRegisterSendFunc(configName) } m.registerLock.Lock() @@ -197,10 +282,12 @@ func GetMetaManagerMetrics() []map[string]string { // cache queueSize := 0 cacheSize := 0 + manager.cacheMu.RLock() for _, cache := range manager.cacheMap { queueSize += cache.GetQueueSize() cacheSize += cache.GetSize() } + manager.cacheMu.RUnlock() manager.queueSizeGauge.Set(float64(queueSize)) manager.cacheResourceGauge.Set(float64(cacheSize)) // set labels @@ -239,6 +326,13 @@ func (m *MetaManager) runServer() { go m.metadataHandler.K8sServerRun(m.stopCh) } +func firstNonEmpty(val, def string) string { + if strings.TrimSpace(val) != "" { + return val + } + return def +} + func isEntity(resourceType string) bool { return !strings.Contains(resourceType, LINK_SPLIT_CHARACTER) } diff --git a/plugins/input/kubernetesmetav2/meta_collector.go b/plugins/input/kubernetesmetav2/meta_collector.go index fbed1d39cb..87e340eb45 100644 --- a/plugins/input/kubernetesmetav2/meta_collector.go +++ b/plugins/input/kubernetesmetav2/meta_collector.go @@ -31,6 +31,35 @@ type metaCollector struct { stopCh chan struct{} entityProcessor map[string]ProcessFunc + crConfigs map[string]k8smeta.CustomResourceCollectorConfig +} + +func validateCustomResourceEntityTypeUniqueness( + cfg k8smeta.CustomResourceCollectorConfig, seenEntityTypes map[string]struct{}, +) error { + if _, exists := seenEntityTypes[cfg.EntityType]; exists { + return fmt.Errorf("duplicated CustomResources EntityType %q", cfg.EntityType) + } + seenEntityTypes[cfg.EntityType] = struct{}{} + return nil +} + +func prepareNormalizedCustomResourceConfigs( + customResources []k8smeta.CustomResourceCollectorConfig, +) ([]k8smeta.CustomResourceCollectorConfig, error) { + seenCustomResourceEntityTypes := make(map[string]struct{}) + normalizedConfigs := make([]k8smeta.CustomResourceCollectorConfig, 0, len(customResources)) + for _, cfg := range customResources { + if err := cfg.Normalize(); err != nil { + logger.Warning(context.Background(), k8smeta.K8sMetaUnifyErrorCode, "invalid CustomResources entry", err, "entity", cfg.EntityType) + continue + } + if err := validateCustomResourceEntityTypeUniqueness(cfg, seenCustomResourceEntityTypes); err != nil { + return nil, err + } + normalizedConfigs = append(normalizedConfigs, cfg) + } + return normalizedConfigs, nil } func (m *metaCollector) Start() error { @@ -77,6 +106,30 @@ func (m *metaCollector) Start() error { k8smeta.INGRESS_NAMESPACE: m.processIngressNamespaceLink, } + normalizedCRConfigs, err := prepareNormalizedCustomResourceConfigs(m.serviceK8sMeta.resolvedCustomResources()) + if err != nil { + return err + } + + m.crConfigs = make(map[string]k8smeta.CustomResourceCollectorConfig) + for _, cfg := range normalizedCRConfigs { + if err := m.serviceK8sMeta.metaManager.RegisterCustomResourceCollector(cfg); err != nil { + logger.Warning(context.Background(), k8smeta.K8sMetaUnifyErrorCode, "register custom resource collector", err, "entity", cfg.EntityType) + continue + } + m.crConfigs[cfg.EntityType] = cfg + if cfg.CollectEntity { + m.entityProcessor[cfg.EntityType] = m.processCustomResourceEntity + } + if m.serviceK8sMeta.Pod && cfg.PodLink != nil && cfg.Entity2PodRelation != "" { + m.entityProcessor[k8smeta.PodLinkTypeForEntity(cfg.EntityType)] = m.processPodCustomResourceLink + } + if m.serviceK8sMeta.Namespace && cfg.CollectEntity && cfg.Namespace2EntityRelation != "" { + m.entityProcessor[k8smeta.NamespaceLinkTypeForEntity(cfg.EntityType)] = m.processNamespaceCustomResourceLink + } + m.serviceK8sMeta.metaManager.EnsureCustomResourceInformerStarted(cfg.EntityType) + } + if m.serviceK8sMeta.Pod { m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.POD, m.handleEvent, m.serviceK8sMeta.Interval) } @@ -122,6 +175,17 @@ func (m *metaCollector) Start() error { if m.serviceK8sMeta.Ingress { m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.INGRESS, m.handleEvent, m.serviceK8sMeta.Interval) } + for entityType, cfg := range m.crConfigs { + if cfg.CollectEntity { + m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, entityType, m.handleEvent, m.serviceK8sMeta.Interval) + } + if m.serviceK8sMeta.Pod && cfg.PodLink != nil && cfg.Entity2PodRelation != "" { + m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.PodLinkTypeForEntity(entityType), m.handleEvent, m.serviceK8sMeta.Interval) + } + if m.serviceK8sMeta.Namespace && cfg.CollectEntity && cfg.Namespace2EntityRelation != "" { + m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.NamespaceLinkTypeForEntity(entityType), m.handleEvent, m.serviceK8sMeta.Interval) + } + } if m.serviceK8sMeta.Pod && m.serviceK8sMeta.Node && m.serviceK8sMeta.Node2Pod != "" { m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.POD_NODE, m.handleEvent, m.serviceK8sMeta.Interval) diff --git a/plugins/input/kubernetesmetav2/meta_collector_cr.go b/plugins/input/kubernetesmetav2/meta_collector_cr.go new file mode 100644 index 0000000000..8ecc6853cc --- /dev/null +++ b/plugins/input/kubernetesmetav2/meta_collector_cr.go @@ -0,0 +1,90 @@ +package kubernetesmetav2 + +import ( + "strings" + "time" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + "github.com/alibaba/ilogtail/pkg/helper/k8smeta" + "github.com/alibaba/ilogtail/pkg/models" +) + +func (m *metaCollector) processCustomResourceEntity(data *k8smeta.ObjectWrapper, method string) []models.PipelineEvent { + cfg, ok := m.crConfigs[data.ResourceType] + if !ok { + return nil + } + obj, ok := data.Raw.(*unstructured.Unstructured) + if !ok { + return nil + } + log := &models.Log{} + log.Contents = models.NewLogContents() + log.Timestamp = uint64(time.Now().Unix()) + kindKey := data.ResourceType + m.processEntityCommonPart(log.Contents, kindKey, obj.GetNamespace(), obj.GetName(), method, data.FirstObservedTime, data.LastObservedTime, obj.GetCreationTimestamp()) + log.Contents.Add(entityKindFieldName, cfg.Kind) + log.Contents.Add("api_version", obj.GetAPIVersion()) + log.Contents.Add("namespace", obj.GetNamespace()) + + if cfg.EnableLabels { + log.Contents.Add("labels", m.processEntityJSONObject(obj.GetLabels())) + } + if cfg.EnableAnnotations { + log.Contents.Add("annotations", m.processEntityJSONObject(obj.GetAnnotations())) + } + return []models.PipelineEvent{log} +} + +func (m *metaCollector) processNamespaceCustomResourceLink(data *k8smeta.ObjectWrapper, method string) []models.PipelineEvent { + obj, ok := data.Raw.(*k8smeta.NamespaceCustomResource) + if !ok { + return nil + } + nsLinkSuffix := k8smeta.LINK_SPLIT_CHARACTER + k8smeta.NAMESPACE + if !strings.HasSuffix(data.ResourceType, nsLinkSuffix) { + return nil + } + entityType := strings.TrimSuffix(data.ResourceType, nsLinkSuffix) + if entityType == "" { + return nil + } + cfg := m.crConfigs[entityType] + if cfg.EntityType == "" || cfg.Namespace2EntityRelation == "" { + return nil + } + log := &models.Log{} + log.Contents = models.NewLogContents() + m.processEntityLinkCommonPart(log.Contents, obj.Namespace.Kind, obj.Namespace.Namespace, obj.Namespace.Name, + cfg.EntityType, obj.CR.GetNamespace(), obj.CR.GetName(), method, data.FirstObservedTime, data.LastObservedTime) + log.Contents.Add(entityLinkRelationTypeFieldName, cfg.Namespace2EntityRelation) + log.Timestamp = uint64(time.Now().Unix()) + return []models.PipelineEvent{log} +} + +func (m *metaCollector) processPodCustomResourceLink(data *k8smeta.ObjectWrapper, method string) []models.PipelineEvent { + obj, ok := data.Raw.(*k8smeta.PodCustomResource) + if !ok { + return nil + } + podCRPrefix := k8smeta.POD + k8smeta.LINK_SPLIT_CHARACTER + if !strings.HasPrefix(data.ResourceType, podCRPrefix) { + return nil + } + entityType := strings.TrimPrefix(data.ResourceType, podCRPrefix) + if entityType == "" { + return nil + } + cfg, ok := m.crConfigs[entityType] + if !ok || cfg.Entity2PodRelation == "" { + return nil + } + log := &models.Log{} + log.Contents = models.NewLogContents() + m.processEntityLinkCommonPart(log.Contents, entityType, obj.CR.GetNamespace(), obj.CR.GetName(), + obj.Pod.Kind, obj.Pod.Namespace, obj.Pod.Name, method, data.FirstObservedTime, data.LastObservedTime) + log.Contents.Add(entityLinkRelationTypeFieldName, cfg.Entity2PodRelation) + log.Timestamp = uint64(time.Now().Unix()) + return []models.PipelineEvent{log} +} diff --git a/plugins/input/kubernetesmetav2/meta_collector_cr_test.go b/plugins/input/kubernetesmetav2/meta_collector_cr_test.go new file mode 100644 index 0000000000..437b95ad00 --- /dev/null +++ b/plugins/input/kubernetesmetav2/meta_collector_cr_test.go @@ -0,0 +1,308 @@ +package kubernetesmetav2 + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + "github.com/alibaba/ilogtail/pkg/helper/k8smeta" + "github.com/alibaba/ilogtail/pkg/models" +) + +func testWorkflowUnstructured(t *testing.T) *unstructured.Unstructured { + t.Helper() + u := &unstructured.Unstructured{} + u.SetAPIVersion("argoproj.io/v1alpha1") + u.SetKind("Workflow") + u.SetNamespace("default") + u.SetName("wf-entity") + u.SetLabels(map[string]string{"keep": "yes", "drop": "no"}) + u.SetAnnotations(map[string]string{"anno": "v"}) + require.NoError(t, unstructured.SetNestedField(u.Object, "Running", "status", "phase")) + return u +} + +func TestValidateCustomResourceEntityTypeUniqueness(t *testing.T) { + seen := make(map[string]struct{}) + cfg := k8smeta.CustomResourceCollectorConfig{EntityType: "customresource/argoproj.io/workflow"} + require.NoError(t, validateCustomResourceEntityTypeUniqueness(cfg, seen)) + err := validateCustomResourceEntityTypeUniqueness(cfg, seen) + require.Error(t, err) + assert.Contains(t, err.Error(), "duplicated CustomResources EntityType") +} + +func TestPrepareNormalizedCustomResourceConfigsReturnErrorOnDuplicateEntityType(t *testing.T) { + cfg1 := k8smeta.CustomResourceCollectorConfig{ + EntityType: "customresource/argoproj.io/workflow", + APIGroup: "argoproj.io", + APIVersion: "v1alpha1", + Resource: "workflows", + Kind: "Workflow", + PodLink: &k8smeta.PodToCustomResourceLinkConfig{}, + CollectEntity: true, + } + cfg2 := k8smeta.CustomResourceCollectorConfig{ + EntityType: "customresource/argoproj.io/workflow", + APIGroup: "argoproj.io", + APIVersion: "v1alpha1", + Resource: "workflows", + Kind: "Workflow", + } + + normalized, err := prepareNormalizedCustomResourceConfigs([]k8smeta.CustomResourceCollectorConfig{cfg1, cfg2}) + require.Error(t, err) + assert.Nil(t, normalized) + assert.Contains(t, err.Error(), "duplicated CustomResources EntityType") +} + +func TestPrepareNormalizedCustomResourceConfigsSkipsInvalidEntries(t *testing.T) { + invalid := k8smeta.CustomResourceCollectorConfig{ + EntityType: "customresource/argoproj.io/workflow", + } + valid := k8smeta.CustomResourceCollectorConfig{ + EntityType: "customresource/argoproj.io/rollout", + APIGroup: "argoproj.io", + APIVersion: "v1alpha1", + Resource: "rollouts", + Kind: "Rollout", + PodLink: &k8smeta.PodToCustomResourceLinkConfig{}, + } + + normalized, err := prepareNormalizedCustomResourceConfigs([]k8smeta.CustomResourceCollectorConfig{invalid, valid}) + require.NoError(t, err) + require.Len(t, normalized, 1) + assert.Equal(t, "customresource/argoproj.io/rollout", normalized[0].EntityType) + // Normalize should fill PodLink defaults from CR config. + assert.Equal(t, "Rollout", normalized[0].PodLink.OwnerKind) + assert.Equal(t, "argoproj.io", normalized[0].PodLink.OwnerAPIGroupContains) +} + +func TestProcessPodCustomResourceLink(t *testing.T) { + entityType := "argo.workflow" + linkRT := k8smeta.POD + k8smeta.LINK_SPLIT_CHARACTER + entityType + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "pod1", Namespace: "default"}, + } + cr := &unstructured.Unstructured{} + cr.SetAPIVersion("argoproj.io/v1alpha1") + cr.SetKind("Workflow") + cr.SetNamespace("default") + cr.SetName("wf1") + + m := &metaCollector{ + serviceK8sMeta: &ServiceK8sMeta{}, + crConfigs: map[string]k8smeta.CustomResourceCollectorConfig{ + entityType: { + EntityType: entityType, + Entity2PodRelation: "contains", + }, + }, + } + data := &k8smeta.ObjectWrapper{ + ResourceType: linkRT, + Raw: &k8smeta.PodCustomResource{ + Pod: pod, + CR: cr, + }, + FirstObservedTime: 1, + LastObservedTime: 2, + } + events := m.processPodCustomResourceLink(data, "update") + require.Len(t, events, 1) + log, ok := events[0].(*models.Log) + require.True(t, ok) + rel := log.Contents.Get(entityLinkRelationTypeFieldName) + assert.Equal(t, "contains", rel) +} + +func TestProcessNamespaceCustomResourceLink(t *testing.T) { + entityType := "argo.workflow" + linkRT := entityType + k8smeta.LINK_SPLIT_CHARACTER + k8smeta.NAMESPACE + + ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "production"}} + cr := &unstructured.Unstructured{} + cr.SetAPIVersion("argoproj.io/v1alpha1") + cr.SetKind("Workflow") + cr.SetNamespace("production") + cr.SetName("wf-ns") + + m := &metaCollector{ + serviceK8sMeta: &ServiceK8sMeta{}, + crConfigs: map[string]k8smeta.CustomResourceCollectorConfig{ + entityType: { + EntityType: entityType, + Namespace2EntityRelation: "contains", + }, + }, + } + data := &k8smeta.ObjectWrapper{ + ResourceType: linkRT, + Raw: &k8smeta.NamespaceCustomResource{ + Namespace: ns, + CR: cr, + }, + FirstObservedTime: 10, + LastObservedTime: 20, + } + events := m.processNamespaceCustomResourceLink(data, "update") + require.Len(t, events, 1) + log, ok := events[0].(*models.Log) + require.True(t, ok) + assert.Equal(t, "contains", log.Contents.Get(entityLinkRelationTypeFieldName)) +} + +func TestProcessNamespaceCustomResourceLinkSkipsWhenRelationUnset(t *testing.T) { + entityType := "argo.workflow" + linkRT := entityType + k8smeta.LINK_SPLIT_CHARACTER + k8smeta.NAMESPACE + m := &metaCollector{ + serviceK8sMeta: &ServiceK8sMeta{}, + crConfigs: map[string]k8smeta.CustomResourceCollectorConfig{ + entityType: {EntityType: entityType}, + }, + } + data := &k8smeta.ObjectWrapper{ + ResourceType: linkRT, + Raw: &k8smeta.NamespaceCustomResource{ + Namespace: &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "x"}}, + CR: &unstructured.Unstructured{}, + }, + } + assert.Nil(t, m.processNamespaceCustomResourceLink(data, "update")) +} + +func TestProcessNamespaceCustomResourceLinkRejectsNonNamespaceLinkResourceType(t *testing.T) { + m := &metaCollector{ + crConfigs: map[string]k8smeta.CustomResourceCollectorConfig{ + "argo.workflow": {EntityType: "argo.workflow", Namespace2EntityRelation: "contains"}, + }, + } + data := &k8smeta.ObjectWrapper{ + ResourceType: "argo.workflow", + Raw: &k8smeta.NamespaceCustomResource{ + Namespace: &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}}, + CR: &unstructured.Unstructured{}, + }, + } + assert.Nil(t, m.processNamespaceCustomResourceLink(data, "update")) +} + +func TestProcessPodCustomResourceLinkRejectsNonPodCRResourceType(t *testing.T) { + m := &metaCollector{ + crConfigs: map[string]k8smeta.CustomResourceCollectorConfig{ + "argo.workflow": {EntityType: "argo.workflow", Entity2PodRelation: "contains"}, + }, + } + data := &k8smeta.ObjectWrapper{ + ResourceType: "argo.workflow", + Raw: &k8smeta.PodCustomResource{ + Pod: &corev1.Pod{}, + CR: &unstructured.Unstructured{}, + }, + } + assert.Nil(t, m.processPodCustomResourceLink(data, "update")) +} + +func TestProcessPodCustomResourceLinkWrongRawType(t *testing.T) { + m := &metaCollector{ + serviceK8sMeta: &ServiceK8sMeta{}, + crConfigs: map[string]k8smeta.CustomResourceCollectorConfig{"argo.workflow": {}}, + } + data := &k8smeta.ObjectWrapper{ + ResourceType: k8smeta.POD + k8smeta.LINK_SPLIT_CHARACTER + "argo.workflow", + Raw: &corev1.Pod{}, + } + assert.Nil(t, m.processPodCustomResourceLink(data, "update")) +} + +func stringField(t *testing.T, log *models.Log, key string) string { + t.Helper() + v := log.Contents.Get(key) + require.NotNil(t, v) + s, ok := v.(string) + require.True(t, ok) + return s +} + +func TestProcessCustomResourceEntityUnknownResourceType(t *testing.T) { + m := &metaCollector{ + serviceK8sMeta: &ServiceK8sMeta{Interval: 10}, + crConfigs: map[string]k8smeta.CustomResourceCollectorConfig{}, + } + data := &k8smeta.ObjectWrapper{ + ResourceType: "unknown.type", + Raw: testWorkflowUnstructured(t), + } + assert.Nil(t, m.processCustomResourceEntity(data, "update")) +} + +func TestProcessCustomResourceEntityWrongRawType(t *testing.T) { + entityType := "argo.workflow" + m := &metaCollector{ + serviceK8sMeta: &ServiceK8sMeta{Interval: 10}, + crConfigs: map[string]k8smeta.CustomResourceCollectorConfig{ + entityType: {EntityType: entityType, Kind: "Workflow"}, + }, + } + data := &k8smeta.ObjectWrapper{ResourceType: entityType, Raw: &corev1.Pod{}} + assert.Nil(t, m.processCustomResourceEntity(data, "update")) +} + +func TestProcessCustomResourceEntityCoreFields(t *testing.T) { + entityType := "argo.workflow" + cfg := k8smeta.CustomResourceCollectorConfig{EntityType: entityType, Kind: "Workflow"} + m := &metaCollector{ + serviceK8sMeta: &ServiceK8sMeta{Interval: 10}, + crConfigs: map[string]k8smeta.CustomResourceCollectorConfig{entityType: cfg}, + } + data := &k8smeta.ObjectWrapper{ + ResourceType: entityType, + Raw: testWorkflowUnstructured(t), + FirstObservedTime: 100, + LastObservedTime: 200, + } + events := m.processCustomResourceEntity(data, "update") + require.Len(t, events, 1) + log := events[0].(*models.Log) + assert.Equal(t, "Workflow", stringField(t, log, entityKindFieldName)) + assert.Equal(t, "argoproj.io/v1alpha1", stringField(t, log, "api_version")) + assert.Equal(t, "default", stringField(t, log, "namespace")) + assert.False(t, log.Contents.Contains("labels")) + assert.False(t, log.Contents.Contains("annotations")) + assert.False(t, log.Contents.Contains("status")) +} + +func TestProcessCustomResourceEntityEnableLabels(t *testing.T) { + entityType := "argo.workflow" + cfg := k8smeta.CustomResourceCollectorConfig{EntityType: entityType, Kind: "Workflow", EnableLabels: true} + m := &metaCollector{ + serviceK8sMeta: &ServiceK8sMeta{Interval: 10}, + crConfigs: map[string]k8smeta.CustomResourceCollectorConfig{entityType: cfg}, + } + data := &k8smeta.ObjectWrapper{ResourceType: entityType, Raw: testWorkflowUnstructured(t)} + events := m.processCustomResourceEntity(data, "update") + require.Len(t, events, 1) + log := events[0].(*models.Log) + labels := stringField(t, log, "labels") + assert.Contains(t, labels, "keep") + assert.Contains(t, labels, "drop") +} + +func TestProcessCustomResourceEntityEnableAnnotations(t *testing.T) { + entityType := "argo.workflow" + cfg := k8smeta.CustomResourceCollectorConfig{EntityType: entityType, Kind: "Workflow", EnableAnnotations: true} + m := &metaCollector{ + serviceK8sMeta: &ServiceK8sMeta{Interval: 10}, + crConfigs: map[string]k8smeta.CustomResourceCollectorConfig{entityType: cfg}, + } + data := &k8smeta.ObjectWrapper{ResourceType: entityType, Raw: testWorkflowUnstructured(t)} + events := m.processCustomResourceEntity(data, "update") + require.Len(t, events, 1) + log := events[0].(*models.Log) + annos := stringField(t, log, "annotations") + assert.Contains(t, annos, "anno") +} diff --git a/plugins/input/kubernetesmetav2/service_meta.go b/plugins/input/kubernetesmetav2/service_meta.go index 1d99c224f2..d788968f49 100644 --- a/plugins/input/kubernetesmetav2/service_meta.go +++ b/plugins/input/kubernetesmetav2/service_meta.go @@ -31,7 +31,9 @@ type ServiceK8sMeta struct { StorageClass bool Ingress bool Container bool - // labels and annotations switch + // CustomResources registers third-party CRs (dynamic informer + optional CR→Pod links via PodLink). See k8smeta.CustomResourceCollectorConfig. + CustomResources []k8smeta.CustomResourceCollectorConfig `json:"CustomResources,omitempty"` + // EnableLabels / EnableAnnotations: when true, emit full labels/annotations on built-in entity kinds (not CustomResources; those use CustomResources[].EnableLabels/EnableAnnotations). EnableLabels bool EnableAnnotations bool // link switch @@ -132,6 +134,15 @@ func (s *ServiceK8sMeta) initDomain() { } +func (s *ServiceK8sMeta) resolvedCustomResources() []k8smeta.CustomResourceCollectorConfig { + if len(s.CustomResources) == 0 { + return nil + } + out := make([]k8smeta.CustomResourceCollectorConfig, len(s.CustomResources)) + copy(out, s.CustomResources) + return out +} + func init() { pipeline.ServiceInputs["service_kubernetes_meta"] = func() pipeline.ServiceInput { return &ServiceK8sMeta{