Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions .mk/development.mk
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ deploy-netflow-simulator: ## Deploy netflow simulator
undeploy-netflow-simulator: ## Undeploy netflow simulator
kubectl --ignore-not-found=true delete -f contrib/kubernetes/deployment-netflow-simulator.yaml || true

.PHONY: deploy-flp-informers
deploy-flp-informers: ## Deploy flp-informers (centralized K8s cache pusher)
sed 's|%IMAGE_TAG_BASE%|$(IMAGE_TAG_BASE)|g;s|%VERSION%|$(VERSION)|g;s|%NAMESPACE%|$(NAMESPACE)|g' contrib/kubernetes/deployment-flp-informers.yaml > /tmp/deployment-flp-informers.yaml
kubectl apply -f /tmp/deployment-flp-informers.yaml -n $(NAMESPACE)
kubectl rollout status "deploy/flp-informers" --timeout=600s -n $(NAMESPACE)

.PHONY: undeploy-flp-informers
undeploy-flp-informers: ## Undeploy flp-informers
sed 's|%IMAGE_TAG_BASE%|$(IMAGE_TAG_BASE)|g;s|%VERSION%|$(VERSION)|g;s|%NAMESPACE%|$(NAMESPACE)|g' contrib/kubernetes/deployment-flp-informers.yaml > /tmp/deployment-flp-informers.yaml
kubectl --ignore-not-found=true delete -f /tmp/deployment-flp-informers.yaml -n $(NAMESPACE) || true

##@ kind

.PHONY: create-kind-cluster
Expand Down Expand Up @@ -121,6 +132,32 @@ local-cleanup: prereqs-kind local-deployments-cleanup delete-kind-cluster ## Und
.PHONY: local-redeploy
local-redeploy: local-deployments-cleanup local-deployments-deploy ## Redeploy locally (on current kind)

.PHONY: local-deployments-deploy-k8scache
local-deployments-deploy-k8scache: prereqs-kind deploy-prometheus deploy-loki deploy-grafana build-image kind-load-image deploy-k8scache deploy-flp-informers deploy-netflow-simulator
kubectl get pods -n $(NAMESPACE)
kubectl rollout status -w deployment/flowlogs-pipeline -n $(NAMESPACE)
kubectl rollout status -w deployment/flp-informers -n $(NAMESPACE)
kubectl logs -l app=flowlogs-pipeline -n $(NAMESPACE)

.PHONY: deploy-k8scache
deploy-k8scache: ## Deploy FLP with k8scache server enabled
sed 's|%IMAGE_TAG_BASE%|$(IMAGE_TAG_BASE)|g;s|%VERSION%|$(VERSION)|g;s|%NAMESPACE%|$(NAMESPACE)|g' contrib/kubernetes/deployment-k8scache.yaml > /tmp/deployment-k8scache.yaml
kubectl create configmap flowlogs-pipeline-configuration --from-file=flowlogs-pipeline.conf.yaml=$(FLP_CONF_FILE) -n $(NAMESPACE)
kubectl apply -f /tmp/deployment-k8scache.yaml -n $(NAMESPACE)
kubectl rollout status "deploy/flowlogs-pipeline" --timeout=600s -n $(NAMESPACE)

.PHONY: local-deploy-k8scache
local-deploy-k8scache: prereqs-kind local-cleanup-k8scache create-kind-cluster local-deployments-deploy-k8scache ## Deploy locally on kind with k8scache and flp-informers

.PHONY: local-deployments-cleanup-k8scache
local-deployments-cleanup-k8scache: prereqs-kind undeploy-netflow-simulator undeploy undeploy-flp-informers undeploy-grafana undeploy-loki undeploy-prometheus

.PHONY: local-cleanup-k8scache
local-cleanup-k8scache: prereqs-kind local-deployments-cleanup-k8scache delete-kind-cluster ## Undeploy k8scache setup from local kind

.PHONY: local-redeploy-k8scache
local-redeploy-k8scache: local-deployments-cleanup-k8scache local-deployments-deploy-k8scache ## Redeploy locally with k8scache (on current kind)

.PHONY: ocp-deploy
ocp-deploy: ocp-cleanup deploy-prometheus deploy-loki deploy-grafana deploy ## Deploy to OCP
flowlogs_pipeline_svc_ip=$$(kubectl get svc flowlogs-pipeline -o jsonpath='{.spec.clusterIP}'); \
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ IMAGE_TAG_BASE ?= $(IMAGE_REGISTRY)/$(IMAGE_ORG)/flowlogs-pipeline
# Image URL to use all building/pushing image targets
IMAGE ?= $(IMAGE_TAG_BASE):$(VERSION)

# Kubernetes namespace for deployments - defaults to current context namespace or "default"
NAMESPACE ?= $(shell ns=$$(kubectl config view --minify --output 'jsonpath={..namespace}' 2>/dev/null); echo "$${ns:-default}")

# Image building tool (docker / podman) - docker is preferred in CI
OCI_BIN_PATH = $(shell which docker 2>/dev/null || which podman)
OCI_BIN ?= $(shell basename ${OCI_BIN_PATH})
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ Flags:
--health.address string Health server address (default "0.0.0.0")
--health.port int Health server port (default: disable health server)
-h, --help help for flowlogs-pipeline
--k8scache.address string K8s cache sync server address (default "0.0.0.0")
--k8scache.port int K8s cache sync server port (default: disabled)
Comment on lines +57 to +58
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check all k8scache flags defined in main.go
rg "k8scache\." cmd/flowlogs-pipeline/main.go -n

Repository: netobserv/flowlogs-pipeline

Length of output: 1048


Include TLS flags in the help output.

The code defines --k8scache.tls-enabled, --k8scache.tls-cert-path, --k8scache.tls-key-path, and --k8scache.tls-ca-path, but the README only documents the address and port flags. Regenerate the help output to include all k8scache options.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@README.md` around lines 57 - 58, The README's flags list is missing the
TLS-related k8scache options; update the help output block to include
--k8scache.tls-enabled, --k8scache.tls-cert-path, --k8scache.tls-key-path, and
--k8scache.tls-ca-path alongside the existing --k8scache.address and
--k8scache.port entries, showing their default values and short descriptions
(e.g. whether TLS is enabled, paths for cert/key/CA), and regenerate the help
output so the README accurately documents all k8scache options.

--log-level string Log level: debug, info, warning, error (default "error")
--metricsSettings string json for global metrics settings
--parameters string json of config file parameters field
Expand Down
118 changes: 118 additions & 0 deletions cmd/flowlogs-pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package main

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"net"
"net/http"
"os"
"path/filepath"
Expand All @@ -33,12 +36,17 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/datasource"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/kubernetes/k8scache"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/netobserv/flowlogs-pipeline/pkg/prometheus"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

var (
Expand Down Expand Up @@ -145,6 +153,12 @@ func initFlags() {
rootCmd.PersistentFlags().StringVar(&opts.Health.Address, "health.address", "0.0.0.0", "Health server address")
rootCmd.PersistentFlags().IntVar(&opts.Health.Port, "health.port", 0, "Health server port (default: disable health server) ")
rootCmd.PersistentFlags().IntVar(&opts.Profile.Port, "profile.port", 0, "Go pprof tool port (default: disabled)")
rootCmd.PersistentFlags().StringVar(&opts.K8sCacheServer.Address, "k8scache.address", "0.0.0.0", "K8s cache sync server address")
rootCmd.PersistentFlags().IntVar(&opts.K8sCacheServer.Port, "k8scache.port", 0, "K8s cache sync server port (default: disabled)")
rootCmd.PersistentFlags().BoolVar(&opts.K8sCacheServer.TLSEnabled, "k8scache.tls-enabled", false, "Enable TLS for K8s cache sync server")
rootCmd.PersistentFlags().StringVar(&opts.K8sCacheServer.TLSCertPath, "k8scache.tls-cert-path", "", "Path to TLS server certificate")
rootCmd.PersistentFlags().StringVar(&opts.K8sCacheServer.TLSKeyPath, "k8scache.tls-key-path", "", "Path to TLS server private key")
rootCmd.PersistentFlags().StringVar(&opts.K8sCacheServer.TLSCAPath, "k8scache.tls-ca-path", "", "Path to TLS CA certificate for client verification")
rootCmd.PersistentFlags().StringVar(&opts.PipeLine, "pipeline", "", "json of config file pipeline field")
rootCmd.PersistentFlags().StringVar(&opts.Parameters, "parameters", "", "json of config file parameters field")
rootCmd.PersistentFlags().StringVar(&opts.DynamicParameters, "dynamicParameters", "", "json of configmap location for dynamic parameters")
Expand Down Expand Up @@ -183,6 +197,11 @@ func run() {
utils.SetupElegantExit()
promServer := prometheus.InitializePrometheus(&cfg.MetricsSettings)

// Enable k8scache mode if configured (disables local informers to save resources)
if opts.K8sCacheServer.Port > 0 {
kubernetes.SetK8sCacheEnabled(true)
}

// Create new flows pipeline
mainPipeline, err = pipeline.NewPipeline(&cfg)
if err != nil {
Expand All @@ -204,6 +223,12 @@ func run() {
healthServer = operational.NewHealthServer(&opts, mainPipeline.IsAlive, mainPipeline.IsReady)
}

// Start K8s cache server
var grpcServer *grpc.Server
if opts.K8sCacheServer.Port > 0 {
grpcServer = startK8sCacheServer(&opts.K8sCacheServer)
}

// Starts the flows pipeline
mainPipeline.Run()

Expand All @@ -213,9 +238,102 @@ func run() {
if healthServer != nil {
_ = healthServer.Shutdown(context.Background())
}
if grpcServer != nil {
log.Info("stopping K8s cache sync server")
grpcServer.GracefulStop()
Comment on lines +241 to +243
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's find and examine the main.go file around lines 241-243
find . -name main.go -path "*/cmd/flowlogs-pipeline/*" | head -1

Repository: netobserv/flowlogs-pipeline

Length of output: 102


🏁 Script executed:

# Get context around the shutdown code
cat -n cmd/flowlogs-pipeline/main.go | sed -n '230,260p'

Repository: netobserv/flowlogs-pipeline

Length of output: 1253


🏁 Script executed:

# Find the gRPC service definition and StreamUpdates implementation
find . -type f -name "*.go" | xargs grep -l "StreamUpdates" | head -5

Repository: netobserv/flowlogs-pipeline

Length of output: 370


🏁 Script executed:

# Check for any timeout configurations related to gRPC server
rg "GracefulStop|time\.After|context" cmd/flowlogs-pipeline/ -A 2 -B 2

Repository: netobserv/flowlogs-pipeline

Length of output: 889


🏁 Script executed:

# Examine the gRPC service definition for StreamUpdates
cat -n pkg/pipeline/transform/kubernetes/k8scache/k8scache_grpc.pb.go | grep -A 20 "StreamUpdates"

Repository: netobserv/flowlogs-pipeline

Length of output: 7670


🏁 Script executed:

# Also check the server.go implementation
cat -n pkg/pipeline/transform/kubernetes/k8scache/server.go

Repository: netobserv/flowlogs-pipeline

Length of output: 5814


Guard GracefulStop() with a timeout.

StreamUpdates is a long-lived bidirectional streaming RPC. Waiting indefinitely for active streams to drain can hang shutdown; use a bounded wait and fall back to Stop().

Suggested change
 if grpcServer != nil {
 	log.Info("stopping K8s cache sync server")
-	grpcServer.GracefulStop()
+	stopped := make(chan struct{})
+	go func() {
+		grpcServer.GracefulStop()
+		close(stopped)
+	}()
+	select {
+	case <-stopped:
+	case <-time.After(5 * time.Second):
+		log.Warn("timed out waiting for K8s cache sync streams to drain; forcing stop")
+		grpcServer.Stop()
+	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if grpcServer != nil {
log.Info("stopping K8s cache sync server")
grpcServer.GracefulStop()
if grpcServer != nil {
log.Info("stopping K8s cache sync server")
stopped := make(chan struct{})
go func() {
grpcServer.GracefulStop()
close(stopped)
}()
select {
case <-stopped:
case <-time.After(5 * time.Second):
log.Warn("timed out waiting for K8s cache sync streams to drain; forcing stop")
grpcServer.Stop()
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/flowlogs-pipeline/main.go` around lines 241 - 243, The shutdown currently
calls grpcServer.GracefulStop() with no timeout which can hang because
StreamUpdates is long-lived; modify shutdown to perform a bounded graceful stop:
start GracefulStop() in a goroutine, wait for it to finish with a timeout (e.g.,
time.After or context.WithTimeout), and if the timeout elapses call
grpcServer.Stop() to force-close active streams; locate the grpcServer usage in
main.go and update the shutdown block around grpcServer.GracefulStop() to
implement the timeout/fallback to Stop(), referencing grpcServer, GracefulStop,
Stop, and the StreamUpdates RPC.

}

// Give all threads a chance to exit and then exit the process
time.Sleep(time.Second)
log.Debugf("exiting main run")
os.Exit(0)
}

// startK8sCacheServer initializes and starts the gRPC server for K8s cache synchronization
// Returns nil if the datasource is not available (e.g., no kubernetes enrichment configured)
func startK8sCacheServer(cfg *config.K8sCacheServer) *grpc.Server {
// Check if kubernetes datasource is available
ds := kubernetes.GetDatasource()
if ds == nil {
log.Warn("K8s cache server requested but kubernetes datasource not initialized. " +
"Make sure kubernetes enrichment is configured in the pipeline.")
return nil
}

// Attach a Kubernetes store so the cache server can apply received updates; enrichment will use it for lookups
ds.SetKubernetesStore(datasource.NewKubernetesStore())
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Create cache server
cacheServer := k8scache.NewKubernetesCacheServer(ds)

// Create gRPC server with optional TLS
var grpcServer *grpc.Server
if cfg.TLSEnabled {
tlsConfig, err := createServerTLSConfig(cfg)
if err != nil {
log.WithError(err).Fatal("failed to configure TLS for K8s cache server")
return nil
}
Comment on lines +274 to +276
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Unreachable code after log.Fatal.

log.Fatal calls os.Exit(1), so return nil is never reached. Either use log.Error + return nil or remove the return.

Suggested fix
 	if cfg.TLSEnabled {
 		tlsConfig, err := createServerTLSConfig(cfg)
 		if err != nil {
-			log.WithError(err).Fatal("failed to configure TLS for K8s cache server")
-			return nil
+			log.WithError(err).Error("failed to configure TLS for K8s cache server")
+			os.Exit(1)
 		}

Or simply remove return nil since Fatal terminates.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
log.WithError(err).Fatal("failed to configure TLS for K8s cache server")
return nil
}
log.WithError(err).Error("failed to configure TLS for K8s cache server")
os.Exit(1)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/flowlogs-pipeline/main.go` around lines 274 - 276, The return nil after
log.WithError(err).Fatal("failed to configure TLS for K8s cache server") is
unreachable because Fatal exits the process; remove the redundant return nil OR
change the call to a non-fatal logger (e.g., log.WithError(err).Error(...)) if
you intend to return nil from the function; update the code path around the TLS
configuration (the log.WithError(err).Fatal call and the subsequent return nil)
accordingly.

grpcServer = grpc.NewServer(grpc.Creds(tlsConfig))
log.Info("K8s cache server TLS enabled")
} else {
grpcServer = grpc.NewServer()
log.Warn("K8s cache server TLS disabled - connections are insecure (not recommended for production)")
}
k8scache.RegisterKubernetesCacheServiceServer(grpcServer, cacheServer)

// Start listening
address := fmt.Sprintf("%s:%d", cfg.Address, cfg.Port)
listener, err := net.Listen("tcp", address)
if err != nil {
log.WithError(err).WithField("address", address).Fatal("failed to start K8s cache server")
return nil
}
Comment on lines +288 to +291
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Same issue: unreachable return nil after log.Fatal.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/flowlogs-pipeline/main.go` around lines 288 - 291, The call to
log.WithError(err).WithField("address", address).Fatal(...) makes the subsequent
return nil unreachable; change the flow so the error is returned instead of
exiting: replace Fatal with a non-fatal logger (e.g., Error or Warn) and return
the actual error (return err) from the surrounding function, or if you intend to
terminate the process keep Fatal but remove the unreachable return; update the
log/return around log.WithError(...).WithField(...) accordingly.


// Start server in background
go func() {
log.WithField("address", address).Info("starting K8s cache sync server")
if err := grpcServer.Serve(listener); err != nil {
log.WithError(err).Error("K8s cache sync server stopped with error")
}
}()

return grpcServer
}

// createServerTLSConfig creates TLS credentials for the gRPC server
func createServerTLSConfig(cfg *config.K8sCacheServer) (credentials.TransportCredentials, error) {
// Load server certificate and private key
if cfg.TLSCertPath == "" || cfg.TLSKeyPath == "" {
return nil, fmt.Errorf("TLS enabled but cert/key paths not provided")
}

cert, err := tls.LoadX509KeyPair(cfg.TLSCertPath, cfg.TLSKeyPath)
if err != nil {
return nil, fmt.Errorf("failed to load server cert/key: %w", err)
}

tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.NoClientCert, // Default: no client cert required
}
Comment on lines +316 to +319
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Set MinVersion: tls.VersionTLS12 (or TLS 1.3) to prevent weak protocol negotiation.

Without MinVersion, Go defaults to TLS 1.0 for servers, which is insecure.

Suggested fix
 tlsConfig := &tls.Config{
 	Certificates: []tls.Certificate{cert},
 	ClientAuth:   tls.NoClientCert, // Default: no client cert required
+	MinVersion:   tls.VersionTLS12,
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.NoClientCert, // Default: no client cert required
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.NoClientCert, // Default: no client cert required
MinVersion: tls.VersionTLS12,
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@cmd/flowlogs-pipeline/main.go` around lines 316 - 319, The tls.Config used to
build the server TLS settings (tlsConfig := &tls.Config{...}) lacks a minimum
protocol version, allowing insecure TLS versions; update the tls.Config in
main.go (tlsConfig) to include MinVersion: tls.VersionTLS12 (or
tls.VersionTLS13) so the server refuses TLS <1.2, e.g., add MinVersion:
tls.VersionTLS12 alongside Certificates and ClientAuth in the tls.Config literal
that uses cert.


// If CA is provided, require and verify client certificates
if cfg.TLSCAPath != "" {
caCert, err := os.ReadFile(cfg.TLSCAPath)
if err != nil {
return nil, fmt.Errorf("failed to read CA cert: %w", err)
}

caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("failed to append CA cert")
}

tlsConfig.ClientCAs = caCertPool
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
log.Info("K8s cache server: mutual TLS enabled (client certificates required)")
}

return credentials.NewTLS(tlsConfig), nil
}
Loading
Loading