Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions .idea/flowlogs-pipeline.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 31 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,37 @@ Following is the supported API format for network transformations:
flowDirectionField: field providing the flow direction in the input entries; it will be rewritten
ifDirectionField: interface-level field for flow direction, to create in output
</pre>
## Transform Anomaly API
Following is the supported API format for anomaly detection transformations:

<pre>
anomaly:
algorithm: (enum) algorithm used to score anomalies: ewma or zscore
valueField: field containing the numeric value to evaluate
keyFields: list of fields combined to build the per-entity baseline key
windowSize: number of recent samples to keep for baseline statistics
baselineWindow: minimum number of samples before anomaly scores are emitted
sensitivity: threshold multiplier for flagging anomalies (e.g., z-score)
ewmaAlpha: smoothing factor for ewma algorithm; derived from windowSize if omitted
</pre>

Example pipeline stage:

<pre>
- name: anomaly
transform:
type: anomaly
anomaly:
algorithm: zscore
valueField: bytes
keyFields: [srcIP, dstIP, proto]
windowSize: 20
baselineWindow: 5
sensitivity: 3
</pre>
The anomaly stage appends `anomaly_score`, `anomaly_type`, and `baseline_window` to each flow record. Scores reflect the chosen
algorithm (EWMA or z-score); a warming-up period suppresses alerts until the baseline window is populated. Grafana users can
alert on `anomaly_type != "normal"` or threshold `anomaly_score` using the emitted fields.
## Write Loki API
Following is the supported API format for writing to loki:

Expand Down
27 changes: 27 additions & 0 deletions hack/examples/pipeline-anomaly.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
---
log-level: info
pipeline:
- name: ingest
- name: detect-anomaly
follows: ingest
- name: write
follows: detect-anomaly
parameters:
- name: ingest
ingest:
type: synthetic
synthetic:
flowLogsPerMin: 10
- name: detect-anomaly
transform:
type: anomaly
anomaly:
algorithm: zscore
valueField: Bytes
keyFields: [SrcAddr, DstAddr, Proto]
windowSize: 20
baselineWindow: 5
sensitivity: 3
- name: write
write:
type: stdout
Comment thread
jotak marked this conversation as resolved.
Outdated
2 changes: 2 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
GenericType = "generic"
NetworkType = "network"
FilterType = "filter"
AnomalyType = "anomaly"
ConnTrackType = "conntrack"
NoneType = "none"

Expand All @@ -60,6 +61,7 @@ type API struct {
TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"`
TransformFilter TransformFilter `yaml:"filter" doc:"## Transform Filter API\nFollowing is the supported API format for filter transformations:\n"`
TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"`
TransformAnomaly TransformAnomaly `yaml:"anomaly" doc:"## Transform Anomaly API\nFollowing is the supported API format for anomaly detection transformations:\n"`
WriteLoki WriteLoki `yaml:"loki" doc:"## Write Loki API\nFollowing is the supported API format for writing to loki:\n"`
WriteStdout WriteStdout `yaml:"stdout" doc:"## Write Standard Output\nFollowing is the supported API format for writing to standard output:\n"`
WriteIPFIX WriteIpfix `yaml:"ipfix" doc:"## Write IPFIX\nFollowing is the supported API format for writing to an IPFIX collector:\n"`
Expand Down
38 changes: 38 additions & 0 deletions pkg/api/transform_anomaly.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (C) 2024 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package api
Comment thread
jotak marked this conversation as resolved.
Outdated

// TransformAnomalyAlgorithm defines the supported anomaly detection strategies.
// For doc generation, enum definitions must match format `Constant Type = "value" // doc`
type TransformAnomalyAlgorithm string

const (
AnomalyAlgorithmEWMA TransformAnomalyAlgorithm = "ewma" // exponentially weighted moving average baseline
AnomalyAlgorithmZScore TransformAnomalyAlgorithm = "zscore" // rolling z-score over a sliding window
)

// TransformAnomaly describes configuration for anomaly detection stages.
type TransformAnomaly struct {
Algorithm TransformAnomalyAlgorithm `yaml:"algorithm,omitempty" json:"algorithm,omitempty" doc:"(enum) algorithm used to score anomalies: ewma or zscore"`
ValueField string `yaml:"valueField,omitempty" json:"valueField,omitempty" doc:"field containing the numeric value to evaluate"`
KeyFields []string `yaml:"keyFields,omitempty" json:"keyFields,omitempty" doc:"list of fields combined to build the per-entity baseline key"`
WindowSize int `yaml:"windowSize,omitempty" json:"windowSize,omitempty" doc:"number of recent samples to keep for baseline statistics"`
BaselineWindow int `yaml:"baselineWindow,omitempty" json:"baselineWindow,omitempty" doc:"minimum number of samples before anomaly scores are emitted"`
Sensitivity float64 `yaml:"sensitivity,omitempty" json:"sensitivity,omitempty" doc:"threshold multiplier for flagging anomalies (e.g., z-score)"`
EWMAAlpha float64 `yaml:"ewmaAlpha,omitempty" json:"ewmaAlpha,omitempty" doc:"smoothing factor for ewma algorithm; derived from windowSize if omitted"`
}
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ type Transform struct {
Generic *api.TransformGeneric `yaml:"generic,omitempty" json:"generic,omitempty"`
Filter *api.TransformFilter `yaml:"filter,omitempty" json:"filter,omitempty"`
Network *api.TransformNetwork `yaml:"network,omitempty" json:"network,omitempty"`
Anomaly *api.TransformAnomaly `yaml:"anomaly,omitempty" json:"anomaly,omitempty"`
}

type Extract struct {
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/stage_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func NewTransformNetworkParams(name string, nw api.TransformNetwork) StageParam
return StageParam{Name: name, Transform: &Transform{Type: api.NetworkType, Network: &nw}}
}

//nolint:gocritic // hugeParam can be ignored: func only used at init
func NewTransformAnomalyParams(name string, an api.TransformAnomaly) StageParam {
return StageParam{Name: name, Transform: &Transform{Type: api.AnomalyType, Anomaly: &an}}
}

//nolint:gocritic // hugeParam can be ignored: func only used at init
func NewConnTrackParams(name string, ct api.ConnTrack) StageParam {
return StageParam{Name: name, Extract: &Extract{Type: api.ConnTrackType, ConnTrack: &ct}}
Expand Down
2 changes: 2 additions & 0 deletions pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,8 @@ func getTransformer(opMetrics *operational.Metrics, params config.StageParam) (t
transformer, err = transform.NewTransformFilter(params)
case api.NetworkType:
transformer, err = transform.NewTransformNetwork(params, opMetrics)
case api.AnomalyType:
transformer, err = transform.NewTransformAnomaly(params, opMetrics)
case api.NoneType:
transformer, err = transform.NewTransformNone()
default:
Expand Down
1 change: 1 addition & 0 deletions pkg/pipeline/transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Definition struct {
Type string
Generic api.TransformGeneric
Network api.TransformNetwork
Anomaly api.TransformAnomaly
}

type Definitions []Definition
Loading
Loading