Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 117 additions & 25 deletions pkg/dataloader/prowloader/prow.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"reflect"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -167,11 +166,75 @@ func (pl *ProwLoader) Errors() []error {
return pl.errors
}

// PartitionManagementConfig defines partition lifecycle settings for a table
type PartitionManagementConfig struct {
TableName string // Name of the partitioned table
FuturePartitionWindow time.Duration // How far in the future to create partitions
DetachAfter int // Detach partitions older than this many days
DropDetachedAfter int // Drop detached partitions older than this many days
InitialLookbackDays int // Days to look back when initializing a new table
}

// sippy_backup currently deletes data older than 90 days
// this adds grace of 5 days before detaching and
// 10 before deleting. test_analysis_by_job_by_dates
// is not currently managed by sippy_backup
var partitionConfigs = []PartitionManagementConfig{
{
TableName: "test_analysis_by_job_by_dates",
FuturePartitionWindow: 48 * time.Hour,
DetachAfter: 95,
DropDetachedAfter: 100,
InitialLookbackDays: 15,
},
{
TableName: "prow_job_run_tests",
FuturePartitionWindow: 48 * time.Hour,
DetachAfter: 95,
DropDetachedAfter: 100,
InitialLookbackDays: 15,
},
{
TableName: "prow_job_run_test_outputs",
FuturePartitionWindow: 48 * time.Hour,
DetachAfter: 95,
DropDetachedAfter: 100,
InitialLookbackDays: 15,
},
}

func (pl *ProwLoader) updatePartitions(config PartitionManagementConfig) error {
err := pl.agePartitions(config)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("error aging %s", config.TableName))
}

err = pl.preparePartitions(config)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("error preparing %s", config.TableName))
}

return nil
}

func (pl *ProwLoader) Load() {
start := time.Now()

log.Infof("started loading prow jobs to DB...")

for _, config := range partitionConfigs {
err := pl.updatePartitions(config)
if err != nil {
pl.errors = append(pl.errors, err)

// if we have errors with partition management we can't be sure that we have created
// the necessary partitions to proceed with loading
// we could possibly differentiate between removing old and creating new but for now
// any failures here block any loading
return
}
}

// Update unmerged PR statuses in case any have merged
if err := pl.syncPRStatus(); err != nil {
pl.errors = append(pl.errors, errors.Wrap(err, "error in syncPRStatus"))
Expand Down Expand Up @@ -331,19 +394,62 @@ func DaysBetween(start, end time.Time) []string {
return days
}

// NextDay takes a date string in YYYY-MM-DD format and returns the date string for the following day.
func NextDay(dateStr string) (string, error) {
// Parse the input date string
date, err := time.Parse("2006-01-02", dateStr)
// agePartitions detaches and drops old partitions based on configuration
func (pl *ProwLoader) agePartitions(config PartitionManagementConfig) error {
detached, err := pl.dbc.DetachOldPartitions(config.TableName, config.DetachAfter, false)
if err != nil {
log.WithError(err).Errorf("error detaching partitions for %s", config.TableName)
return err
}
log.Infof("detached %d partitions from %s", detached, config.TableName)
dropped, err := pl.dbc.DropOldDetachedPartitions(config.TableName, config.DropDetachedAfter, false)
if err != nil {
log.WithError(err).Errorf("error dropping detached partitions for %s", config.TableName)
return err
}
log.Infof("dropped %d detached partitions from %s", dropped, config.TableName)

return nil
}

// preparePartitions creates missing partitions for future data based on configuration
func (pl *ProwLoader) preparePartitions(config PartitionManagementConfig) error {
log.Infof("preparing partitions for %s", config.TableName)
stats, err := pl.dbc.GetAttachedPartitionStats(config.TableName)

if err != nil {
return "", fmt.Errorf("invalid date format: %v", err)
log.WithError(err).Errorf("error getting partition stats for %s", config.TableName)
return err
}
fmt.Printf(" Total: %d partitions (%s)\n", stats.TotalPartitions, stats.TotalSizePretty)

// When initializing a new table, look back the configured number of days
oldestDate := time.Now().Add(-time.Duration(config.InitialLookbackDays) * 24 * time.Hour)
if stats.TotalPartitions > 0 {

// Add one day to the parsed date
nextDay := date.Add(24 * time.Hour)
var startRange, endRange string
if stats.OldestDate.Valid {
startRange = stats.OldestDate.Time.Format("2006-01-02")
oldestDate = stats.OldestDate.Time
}
if stats.NewestDate.Valid {
endRange = stats.OldestDate.Time.Format("2006-01-02")
}
fmt.Printf(" Range: %s to %s\n",
startRange,
endRange)

// Format the next day back to YYYY-MM-DD
return nextDay.Format("2006-01-02"), nil
}

futureDate := time.Now().Add(config.FuturePartitionWindow)
created, err := pl.dbc.CreateMissingPartitions(config.TableName, oldestDate, futureDate, false)
if err != nil {
log.WithError(err).Errorf("error creating partitions for %s", config.TableName)
return err
}

log.Infof("created %d partitions for %s", created, config.TableName)
return nil
}

// loadDailyTestAnalysisByJob loads test analysis data into partitioned tables in postgres, one per
Expand Down Expand Up @@ -382,21 +488,6 @@ func (pl *ProwLoader) loadDailyTestAnalysisByJob(ctx context.Context) error {
dLog := log.WithField("date", dateToImport)

dLog.Infof("Loading test analysis by job daily summaries")
nextDay, err := NextDay(dateToImport)
if err != nil {
return errors.Wrapf(err, "error parsing next day from %s", dateToImport)
}

// create a partition for this date
partitionSQL := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS test_analysis_by_job_by_dates_%s PARTITION OF test_analysis_by_job_by_dates
FOR VALUES FROM ('%s') TO ('%s');`, strings.ReplaceAll(dateToImport, "-", "_"), dateToImport, nextDay)
dLog.Info(partitionSQL)

if res := pl.dbc.DB.Exec(partitionSQL); res.Error != nil {
log.WithError(res.Error).Error("error creating partition")
return res.Error
}
dLog.Warnf("partition created for releases %v", pl.releases)

q := pl.bigQueryClient.Query(ctx, bqlabel.ProwLoaderTestAnalysis, fmt.Sprintf(`WITH
deduped_testcases AS (
Expand Down Expand Up @@ -1250,6 +1341,7 @@ func (pl *ProwLoader) extractTestCases(suite *junit.TestSuite, suiteID *uint, te
continue
}

// interesting that we rely on created_at here which is when we imported the test, not when the test ran
testCases[testCacheKey] = &models.ProwJobRunTest{
TestID: testID,
SuiteID: suiteID,
Expand Down
24 changes: 22 additions & 2 deletions pkg/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ func (d *DB) UpdateSchema(reportEnd *time.Time) error {
&models.ProwJobRunAnnotation{},
&models.Test{},
&models.Suite{},
&models.ProwJobRunTest{},
&models.ProwJobRunTestOutput{},
&models.APISnapshot{},
&models.Bug{},
&models.ProwPullRequest{},
Expand Down Expand Up @@ -111,6 +109,27 @@ func (d *DB) UpdateSchema(reportEnd *time.Time) error {
}
}

// handle partitioned model management outside of gorm
partitionedModelsToMigrate := []struct {
model interface{}
tableName string
}{
{
model: &models.ProwJobRunTest{},
tableName: "prow_job_run_tests",
},
{
model: &models.ProwJobRunTestOutput{},
tableName: "prow_job_run_test_outputs",
},
}

for _, partitionedModel := range partitionedModelsToMigrate {
if _, err := d.UpdatePartitionedTable(partitionedModel.model, partitionedModel.tableName, NewRangePartitionConfig("created_at"), false, false); err != nil {
return err
Comment on lines +127 to +129
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

This path only works if the partitioned tables already exist.

UpdatePartitionedTable errors when the target table is missing. Since these models were removed from AutoMigrate, any environment bootstrapping from an empty schema will stop here before the later partition sync can create them.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/db/db.go` around lines 127 - 129, The current loop calls
UpdatePartitionedTable for each partitionedModel in partitionedModelsToMigrate
but fails when the physical partitioned table doesn't exist; modify the flow to
ensure the base partitioned table exists before calling UpdatePartitionedTable
by either invoking the table-creation routine (e.g., call CreatePartitionedTable
or run AutoMigrate for partitionedModel.model/tableName) or update
UpdatePartitionedTable to detect a missing table and create it instead;
reference partitionedModelsToMigrate, UpdatePartitionedTable, AutoMigrate, and
the partition sync so the missing-table creation happens prior to the
UpdatePartitionedTable call to avoid bootstrapping failures.

}
}

if err := createAuditLogIndexes(d.DB); err != nil {
return err
}
Expand All @@ -123,6 +142,7 @@ func (d *DB) UpdateSchema(reportEnd *time.Time) error {
return err
}

// TODO(fbabcock): migrate this to UpdatePartitionedTable
if err := syncPartitionedTables(d.DB); err != nil {
return err
}
Expand Down
Loading