Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
/bin/
cover.out
y.output
.idea/
33 changes: 33 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,39 @@ Following is the supported API format for network transformations:
flowDirectionField: field providing the flow direction in the input entries; it will be rewritten
ifDirectionField: interface-level field for flow direction, to create in output
</pre>
## Transform Anomaly API
Following is the supported API format for anomaly detection transformations:

<pre>
anomaly:
algorithm: (enum) algorithm used to score anomalies: ewma or zscore
valueField: field containing the numeric value to evaluate
keyFields: list of fields combined to build the per-entity baseline key
prefix: prefix added to output fields to disambiguate when multiple anomaly stages are used
windowSize: number of recent samples to keep for baseline statistics
baselineWindow: minimum number of samples before anomaly scores are emitted
sensitivity: threshold multiplier for flagging anomalies (e.g., z-score)
ewmaAlpha: smoothing factor for ewma algorithm; derived from windowSize if omitted
</pre>

Example pipeline stage:

<pre>
- name: anomaly
transform:
type: anomaly
anomaly:
algorithm: zscore
valueField: bytes
keyFields: [srcIP, dstIP, proto]
prefix: z_
windowSize: 20
baselineWindow: 5
sensitivity: 3
</pre>
The anomaly stage appends `anomaly_score`, `anomaly_type`, and `baseline_window` to each flow record (using the optional `prefix` when provided). Scores reflect the chosen
algorithm (EWMA or z-score); a warming-up period suppresses alerts until the baseline window is populated. Grafana users can
alert on `anomaly_type != "normal"` or threshold `anomaly_score` using the emitted fields.
## Write Loki API
Following is the supported API format for writing to loki:

Expand Down
44 changes: 44 additions & 0 deletions hack/examples/pipeline-anomaly.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
log-level: info
pipeline:
- name: ingest
- name: detect-anomaly
follows: ingest
- name: write_stdout # for debugging
follows: detect-anomaly
# Uncomment for production: export anomaly metrics to Prometheus
# - name: write_metrics
# follows: detect-anomaly
# write:
# type: prometheus
# prometheus:
# port: 9102
# prefix: flowlogs_pipeline_
# metrics:
# - name: anomaly_score
# type: gauge
# valueKey: bytes_anomaly_score
# labels:
# - SrcAddr
# - DstAddr
# - Proto
# - anomaly_type
parameters:
- name: ingest
ingest:
type: synthetic
synthetic:
flowLogsPerMin: 10
- name: detect-anomaly
transform:
type: anomaly
anomaly:
algorithm: zscore
valueField: Bytes
keyFields: [SrcAddr, DstAddr, Proto]
prefix: bytes_
windowSize: 20
baselineWindow: 5
sensitivity: 3
- name: write_stdout
write:
type: stdout
2 changes: 2 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
GenericType = "generic"
NetworkType = "network"
FilterType = "filter"
AnomalyType = "anomaly"
ConnTrackType = "conntrack"
NoneType = "none"

Expand All @@ -60,6 +61,7 @@ type API struct {
TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"`
TransformFilter TransformFilter `yaml:"filter" doc:"## Transform Filter API\nFollowing is the supported API format for filter transformations:\n"`
TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"`
TransformAnomaly TransformAnomaly `yaml:"anomaly" doc:"## Transform Anomaly API\nFollowing is the supported API format for anomaly detection transformations:\n"`
WriteLoki WriteLoki `yaml:"loki" doc:"## Write Loki API\nFollowing is the supported API format for writing to loki:\n"`
WriteStdout WriteStdout `yaml:"stdout" doc:"## Write Standard Output\nFollowing is the supported API format for writing to standard output:\n"`
WriteIPFIX WriteIpfix `yaml:"ipfix" doc:"## Write IPFIX\nFollowing is the supported API format for writing to an IPFIX collector:\n"`
Expand Down
39 changes: 39 additions & 0 deletions pkg/api/transform_anomaly.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (C) 2024 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package api
Comment thread
jotak marked this conversation as resolved.
Outdated

// TransformAnomalyAlgorithm defines the supported anomaly detection strategies.
// For doc generation, enum definitions must match format `Constant Type = "value" // doc`
type TransformAnomalyAlgorithm string

const (
AnomalyAlgorithmEWMA TransformAnomalyAlgorithm = "ewma" // exponentially weighted moving average baseline
AnomalyAlgorithmZScore TransformAnomalyAlgorithm = "zscore" // rolling z-score over a sliding window
)

// TransformAnomaly describes configuration for anomaly detection stages.
type TransformAnomaly struct {
Algorithm TransformAnomalyAlgorithm `yaml:"algorithm,omitempty" json:"algorithm,omitempty" doc:"(enum) algorithm used to score anomalies: ewma or zscore"`
ValueField string `yaml:"valueField,omitempty" json:"valueField,omitempty" doc:"field containing the numeric value to evaluate"`
KeyFields []string `yaml:"keyFields,omitempty" json:"keyFields,omitempty" doc:"list of fields combined to build the per-entity baseline key"`
Prefix string `yaml:"prefix,omitempty" json:"prefix,omitempty" doc:"prefix added to output fields to disambiguate when multiple anomaly stages are used"`
WindowSize int `yaml:"windowSize,omitempty" json:"windowSize,omitempty" doc:"number of recent samples to keep for baseline statistics"`
BaselineWindow int `yaml:"baselineWindow,omitempty" json:"baselineWindow,omitempty" doc:"minimum number of samples before anomaly scores are emitted"`
Sensitivity float64 `yaml:"sensitivity,omitempty" json:"sensitivity,omitempty" doc:"threshold multiplier for flagging anomalies (e.g., z-score)"`
EWMAAlpha float64 `yaml:"ewmaAlpha,omitempty" json:"ewmaAlpha,omitempty" doc:"smoothing factor for ewma algorithm; derived from windowSize if omitted"`
}
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ type Transform struct {
Generic *api.TransformGeneric `yaml:"generic,omitempty" json:"generic,omitempty"`
Filter *api.TransformFilter `yaml:"filter,omitempty" json:"filter,omitempty"`
Network *api.TransformNetwork `yaml:"network,omitempty" json:"network,omitempty"`
Anomaly *api.TransformAnomaly `yaml:"anomaly,omitempty" json:"anomaly,omitempty"`
}

type Extract struct {
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/stage_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func NewTransformNetworkParams(name string, nw api.TransformNetwork) StageParam
return StageParam{Name: name, Transform: &Transform{Type: api.NetworkType, Network: &nw}}
}

//nolint:gocritic // hugeParam can be ignored: func only used at init
func NewTransformAnomalyParams(name string, an api.TransformAnomaly) StageParam {
return StageParam{Name: name, Transform: &Transform{Type: api.AnomalyType, Anomaly: &an}}
}

//nolint:gocritic // hugeParam can be ignored: func only used at init
func NewConnTrackParams(name string, ct api.ConnTrack) StageParam {
return StageParam{Name: name, Extract: &Extract{Type: api.ConnTrackType, ConnTrack: &ct}}
Expand Down
7 changes: 5 additions & 2 deletions pkg/operational/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,11 @@ type Metrics struct {
}

func NewMetrics(settings *config.MetricsSettings) *Metrics {
return &Metrics{settings: settings}
}
if settings == nil {
settings = &config.MetricsSettings{}
}
return &Metrics{settings: settings}
}

// register will register against the default registry. May panic or not depending on settings
func (o *Metrics) register(c prometheus.Collector, name string) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,8 @@ func getTransformer(opMetrics *operational.Metrics, params config.StageParam) (t
transformer, err = transform.NewTransformFilter(params)
case api.NetworkType:
transformer, err = transform.NewTransformNetwork(params, opMetrics)
case api.AnomalyType:
transformer, err = transform.NewTransformAnomaly(params, opMetrics)
case api.NoneType:
transformer, err = transform.NewTransformNone()
default:
Expand Down
1 change: 1 addition & 0 deletions pkg/pipeline/transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Definition struct {
Type string
Generic api.TransformGeneric
Network api.TransformNetwork
Anomaly api.TransformAnomaly
}

type Definitions []Definition
Loading