Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

All notable changes to FEVER will be documented in this file.

## [1.4.0] - 2025-09-23

### Changed
- Move from go-pg PostgreSQL library `github.com/go-pg/pg` to `github.com/jackc/pgx/v4`.
- Add unit tests to PostgreSQL slurper.
- Update dependencies.

## [1.3.7] - 2025-04-16

### Changed
Expand Down
7 changes: 6 additions & 1 deletion cmd/fever/cmds/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func mainfunc(cmd *cobra.Command, args []string) {

eventChan := make(chan types.Entry, defaultQueueSize)

// create a cancellable context for components that support it
ctx, cancel := context.WithCancel(context.Background())

util.ToolName = viper.GetString("toolname")

logfilename := viper.GetString("logging.file")
Expand Down Expand Up @@ -211,7 +214,7 @@ func mainfunc(cmd *cobra.Command, args []string) {
}
s = &db.DummySlurper{}
}
s.Run(eventChan)
s.Run(ctx, eventChan)

var forwardHandler processing.Handler
// start forwarding
Expand Down Expand Up @@ -524,6 +527,8 @@ func mainfunc(cmd *cobra.Command, args []string) {
go func() {
for sig := range c {
if sig == syscall.SIGTERM || sig == syscall.SIGINT {
// cancel context to stop background tasks
cancel()
pprof.StopCPUProfile()
if submitter != nil {
submitter.Finish()
Expand Down
3 changes: 2 additions & 1 deletion db/slurper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package db
// Copyright (c) 2017, DCSO GmbH

import (
"context"
"github.com/DCSO/fever/types"
)

Expand All @@ -12,6 +13,6 @@ import (
// Finish() can be used to finalize any state.
// TODO implement proper start/stop (atm 'hard' stop by exit()ing)
type Slurper interface {
Run(chan types.Entry)
Run(context.Context, chan types.Entry)
Finish()
}
4 changes: 3 additions & 1 deletion db/slurper_dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ package db
// Copyright (c) 2017, DCSO GmbH

import (
"context"

"github.com/DCSO/fever/types"
)

// DummySlurper is a slurper that just consumes entries with no action.
type DummySlurper struct{}

// Run starts a DummySlurper.
func (s *DummySlurper) Run(eventchan chan types.Entry) {
func (s *DummySlurper) Run(_ctx context.Context, eventchan chan types.Entry) {
go func() {
for range eventchan {
}
Expand Down
3 changes: 2 additions & 1 deletion db/slurper_mongodb.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package db

import (
"context"
"encoding/json"
"fmt"

Expand Down Expand Up @@ -266,7 +267,7 @@ func MakeMongoSlurper(host string, database string, user string, password string
}

// Run starts a MongoSlurper.
func (s *MongoSlurper) Run(eventchan chan types.Entry) {
func (s *MongoSlurper) Run(_ctx context.Context, eventchan chan types.Entry) {
// set up workers for each event type
for k, v := range s.TypeDispatch {
go s.eventTypeWorker(v, k)
Expand Down
97 changes: 69 additions & 28 deletions db/slurper_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,29 @@ package db

import (
"bytes"
"context"
"fmt"
"io"
"strings"
"time"

"github.com/DCSO/fever/types"

"github.com/jackc/pgx/v4/pgxpool"
log "github.com/sirupsen/logrus"
pg "gopkg.in/pg.v5"
)

var maxRetries = 20

// PostgresSlurper is a Slurper that stores events in an PostgreSQL database.
type PostgresSlurper struct {
DB *pg.DB
DB *pgxpool.Pool
DBUser string
// CopyFn allows injecting a custom COPY executor for testing.
// It should execute a COPY FROM STDIN using the provided SQL and reader and return rows copied.
CopyFn func(ctx context.Context, pool *pgxpool.Pool, sql string, r io.Reader) (int64, error)
// ExecFn allows injecting execution for DDL statements (e.g., CREATE TABLE ... GRANT ...)
ExecFn func(ctx context.Context, sql string) error
LastRotatedTime time.Time
IndexChan chan string
CurrentTableName string
Expand All @@ -38,12 +46,15 @@ func MakePostgresSlurper(host string, database string, user string,
var err error
var i int
var hasExt int
db := pg.Connect(&pg.Options{
User: user,
Password: password,
Addr: host,
Database: database,
})
dsn := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable", user, password, host, database)
cfg, cfgErr := pgxpool.ParseConfig(dsn)
if cfgErr != nil {
log.WithError(cfgErr).Fatal("failed to parse pgx config")
}
db, err := pgxpool.ConnectConfig(context.Background(), cfg)
if err != nil {
log.WithError(err).Fatal("failed to connect to postgres via pgxpool")
}
l := log.WithFields(log.Fields{
"domain": "slurper",
"slurper": "postgres",
Expand All @@ -53,14 +64,13 @@ func MakePostgresSlurper(host string, database string, user string,
"host": host,
"database": database,
}).Info("connected to database")
_, err = db.Query(pg.Scan(&hasExt), SQLCheckForTrigramExtension)
for i = 0; err != nil && strings.Contains(err.Error(), "system is starting up"); i++ {
if i > maxRetries {
for i = 0; ; i++ {
err = db.QueryRow(context.Background(), SQLCheckForTrigramExtension).Scan(&hasExt)
if err == nil || i > maxRetries || !strings.Contains(err.Error(), "system is starting up") {
break
}
l.Warnf("problem checking for trigram extension: %s -- retrying %d/%d",
err.Error(), i, maxRetries)
_, err = db.Query(pg.Scan(&hasExt), SQLCheckForTrigramExtension)
time.Sleep(10 * time.Second)
}
if err != nil {
Expand All @@ -70,16 +80,17 @@ func MakePostgresSlurper(host string, database string, user string,
l.Fatal("trigram extension ('pg_trgm') not loaded, please run "+
"'CREATE EXTENSION pg_trgm;'", err)
}
_, err = db.Exec(SQLTrigramFunction)
_, err = db.Exec(context.Background(), SQLTrigramFunction)
if err != nil {
l.Fatalf("error creating index preparation function: %s", err)
}
_, err = db.Exec(SQLQueryAllEvents)
_, err = db.Exec(context.Background(), SQLQueryAllEvents)
if err != nil {
l.Fatalf("error creating global query function: %s", err)
}
s := &PostgresSlurper{
DB: db,
DBUser: user,
RotationInterval: rotationInterval,
MaxTableSize: maxTableSize * 1024 * 1024 * 1024,
ChunkSize: chunkSize,
Expand All @@ -93,13 +104,22 @@ type tableSize struct {
Size int64
}

func (s *PostgresSlurper) expireOldTables() error {
var tblSizes []tableSize
_, err := s.DB.Query(&tblSizes, SQLGetTableSizes)
func (s *PostgresSlurper) expireOldTables(ctx context.Context) error {
rows, err := s.DB.Query(ctx, SQLGetTableSizes)
if err != nil {
s.Logger.Warn("error determining table sizes", err)
return err
}
defer rows.Close()
var tblSizes []tableSize
for rows.Next() {
var t tableSize
if err := rows.Scan(&t.Table, &t.Size); err != nil {
s.Logger.WithError(err).Warn("error scanning table size row")
return err
}
tblSizes = append(tblSizes, t)
}
totalSize := int64(0)
for _, v := range tblSizes {
totalSize += v.Size
Expand All @@ -108,7 +128,7 @@ func (s *PostgresSlurper) expireOldTables() error {
"table": v.Table,
"size": v.Size,
}).Info("table expired")
_, err = s.DB.Exec(fmt.Sprintf(`DROP TABLE "%s";`, v.Table))
_, err = s.DB.Exec(ctx, fmt.Sprintf(`DROP TABLE "%s";`, v.Table))
if err != nil {
s.Logger.WithFields(log.Fields{
"table": v.Table,
Expand All @@ -122,26 +142,26 @@ func (s *PostgresSlurper) expireOldTables() error {
return nil
}

func (s *PostgresSlurper) indexFunc() {
func (s *PostgresSlurper) indexFunc(ctx context.Context) {
for tblToIndex := range s.IndexChan {
s.Logger.WithFields(log.Fields{
"table": tblToIndex,
}).Info("creating indexes")
idxSQL := fmt.Sprintf(SQLIndex, tblToIndex, tblToIndex, tblToIndex,
tblToIndex, tblToIndex)
_, idxErr := s.DB.Exec(idxSQL)
_, idxErr := s.DB.Exec(ctx, idxSQL)
if idxErr != nil {
s.Logger.WithFields(log.Fields{
"table": tblToIndex,
"error": idxErr.Error(),
}).Info("error creating index")
}
s.Logger.Info("expiring old tables")
s.expireOldTables()
s.expireOldTables(ctx)
}
}

func (s *PostgresSlurper) slurpPostgres(eventchan chan types.Entry) {
func (s *PostgresSlurper) slurpPostgres(ctx context.Context, eventchan chan types.Entry) {
cnt := 0
var copybuf bytes.Buffer
for {
Expand All @@ -163,8 +183,14 @@ func (s *PostgresSlurper) slurpPostgres(eventchan chan types.Entry) {
"to": newTableName,
}).Info("rotating tables")
}
crSQL := fmt.Sprintf(SQLCreate, newTableName, newTableName, s.DB.Options().User)
_, crErr := s.DB.Exec(crSQL)
crSQL := fmt.Sprintf(SQLCreate, newTableName, newTableName, s.DBUser)
var crErr error
// Use ExecFn if provided, otherwise use DB.Exec
if s.ExecFn != nil {
crErr = s.ExecFn(ctx, crSQL)
} else {
_, crErr = s.DB.Exec(ctx, crSQL)
}
if crErr != nil {
s.Logger.WithFields(log.Fields{
"table": newTableName,
Expand All @@ -178,8 +204,23 @@ func (s *PostgresSlurper) slurpPostgres(eventchan chan types.Entry) {
s.LastRotatedTime = time.Now()
}
cnt = 0
// Use a reader-based COPY to stream the entire buffer
sql := fmt.Sprintf(SQLCopy, s.CurrentTableName)
r := strings.NewReader(copybuf.String())
_, err := s.DB.CopyFrom(r, fmt.Sprintf(SQLCopy, s.CurrentTableName))
var err error
// Use CopyFn if provided, otherwise use DB.CopyFrom
if s.CopyFn != nil {
_, err = s.CopyFn(ctx, s.DB, sql, r)
} else {
conn, acqErr := s.DB.Acquire(ctx)
if acqErr != nil {
s.Logger.WithError(acqErr).Warn("failed to acquire connection for COPY")
copybuf.Reset()
continue
}
_, err = conn.Conn().PgConn().CopyFrom(ctx, r, sql)
conn.Release()
}
if err != nil {
s.Logger.Warn(err)
} else {
Expand All @@ -195,12 +236,12 @@ func (s *PostgresSlurper) slurpPostgres(eventchan chan types.Entry) {
}

// Run starts a PostgresSlurper.
func (s *PostgresSlurper) Run(eventchan chan types.Entry) {
func (s *PostgresSlurper) Run(ctx context.Context, eventchan chan types.Entry) {
// start indexer thread
s.IndexChan = make(chan string, 1000)
go s.indexFunc()
go s.indexFunc(ctx)
// run slurper thread
go s.slurpPostgres(eventchan)
go s.slurpPostgres(ctx, eventchan)
}

// Finish is a null operation in the PostgresSlurper implementation.
Expand Down
Loading