Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions cmd/sippy-daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,7 @@ func NewSippyDaemonCommand() *cobra.Command {
// 4 potential GitHub calls per comment gives us a safe buffer
// get comment data, get existing comments, possible delete existing, and adding the comment
// could lower to 3 seconds if we need, most writes likely won't have to delete
processes = append(processes, sippyserver.NewWorkProcessor(dbc,
gcsClient.Bucket(f.GoogleCloudFlags.StorageBucket),
10, bigQueryClient, 5*time.Minute, 5*time.Second, ghCommenter, f.GithubCommenterFlags.CommentProcessingDryRun))
processes = append(processes, sippyserver.NewWorkProcessor(dbc, bigQueryClient, gcsClient.Bucket(f.GoogleCloudFlags.StorageBucket), cacheClient, ghCommenter, 10, 5*time.Minute, 5*time.Second, f.GithubCommenterFlags.CommentProcessingDryRun))
}

daemonServer := sippyserver.NewDaemonServer(processes)
Expand Down
12 changes: 10 additions & 2 deletions cmd/sippy/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,13 @@ func NewLoadCommand() *cobra.Command {
}
}

// likewise get a cache client if possible, though some things operate without it.
cacheClient, cacheErr := f.CacheFlags.GetCacheClient()
if cacheErr != nil {
log.WithError(cacheErr).Info("cache client not available, proceeding without caching")
cacheClient = nil // error hygiene, since we pass this down to quite a few functions
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

releaseConfigs := []sippyv1.Release{}

// initializing a bigquery client different from the normal one
Expand Down Expand Up @@ -195,6 +198,12 @@ func NewLoadCommand() *cobra.Command {
if dbErr != nil {
return errors.Wrap(dbErr, "CRITICAL error getting postgres client which prevents regression-cache loading")
}
if cacheErr != nil {
return errors.Wrap(cacheErr, "couldn't get cache client")
}
if f.CacheFlags.RedisURL == "" {
return fmt.Errorf("--redis-url is required")
}

views, err := f.ComponentReadinessFlags.ParseViewsFile()
if err != nil {
Expand Down Expand Up @@ -321,9 +330,8 @@ func NewLoadCommand() *cobra.Command {
elapsed := time.Since(start)
log.WithField("elapsed", elapsed).Info("database load complete")

pinnedTime := f.DBFlags.GetPinnedTime()
if refreshMatviews && !f.SkipMatviewRefresh {
sippyserver.RefreshData(dbc, pinnedTime, false)
sippyserver.RefreshData(dbc, cacheClient, false)
}

elapsed = time.Since(start)
Expand Down
16 changes: 13 additions & 3 deletions cmd/sippy/refresh.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

Expand All @@ -10,17 +11,20 @@ import (

type RefreshFlags struct {
DBFlags *flags.PostgresFlags
CacheFlags *flags.CacheFlags
RefreshOnlyIfEmpty bool
}

func NewRefreshFlags() *RefreshFlags {
return &RefreshFlags{
DBFlags: flags.NewPostgresDatabaseFlags(),
DBFlags: flags.NewPostgresDatabaseFlags(),
CacheFlags: flags.NewCacheFlags(),
}
}

func (f *RefreshFlags) BindFlags(fs *pflag.FlagSet) {
f.DBFlags.BindFlags(fs)
f.CacheFlags.BindFlags(fs)
fs.BoolVar(&f.RefreshOnlyIfEmpty, "refresh-only-if-empty", f.RefreshOnlyIfEmpty, "only refresh matviews if they're empty")
}

Expand All @@ -35,8 +39,14 @@ func NewRefreshCommand() *cobra.Command {
if err != nil {
return err
}
pinnedDateTime := f.DBFlags.GetPinnedTime()
sippyserver.RefreshData(dbc, pinnedDateTime, f.RefreshOnlyIfEmpty)
cacheClient, cacheErr := f.CacheFlags.GetCacheClient()
if cacheErr != nil {
logrus.WithError(cacheErr).Warn("failed to get cache client")
cacheClient = nil
} else if cacheClient == nil {
logrus.Warn("no cache provided; refresh will not update cached timestamps, so cached data may not be properly invalidated")
}
sippyserver.RefreshData(dbc, cacheClient, f.RefreshOnlyIfEmpty)
return nil
},
}
Expand Down
12 changes: 11 additions & 1 deletion cmd/sippy/seed_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

type SeedDataFlags struct {
DBFlags *flags.PostgresFlags
CacheFlags *flags.CacheFlags
InitDatabase bool
Releases []string
JobsPerRelease int
Expand All @@ -30,6 +31,7 @@ type SeedDataFlags struct {
func NewSeedDataFlags() *SeedDataFlags {
return &SeedDataFlags{
DBFlags: flags.NewPostgresDatabaseFlags(),
CacheFlags: flags.NewCacheFlags(),
Releases: []string{"5.0", "4.22", "4.21"}, // Default releases
JobsPerRelease: 3, // Default jobs per release
TestNames: []string{
Expand All @@ -48,6 +50,7 @@ func NewSeedDataFlags() *SeedDataFlags {

func (f *SeedDataFlags) BindFlags(fs *pflag.FlagSet) {
f.DBFlags.BindFlags(fs)
f.CacheFlags.BindFlags(fs)
fs.BoolVar(&f.InitDatabase, "init-database", false, "Initialize the DB schema before seeding data")
fs.StringSliceVar(&f.Releases, "release", f.Releases, "Releases to create ProwJobs for (can be specified multiple times)")
fs.IntVar(&f.JobsPerRelease, "jobs", f.JobsPerRelease, "Number of ProwJobs to create for each release")
Expand Down Expand Up @@ -86,6 +89,13 @@ rolled off the 1 week window.
log.Info("Database schema initialized successfully")
}

cacheClient, cacheErr := f.CacheFlags.GetCacheClient()
if cacheErr != nil {
return fmt.Errorf("failed to get cache client: %v", cacheErr)
} else if cacheClient == nil {
log.Warn("no cache provided; refresh timestamps will not be cached")
}

log.Info("Starting to seed test data...")

// Create the test suite
Expand Down Expand Up @@ -131,7 +141,7 @@ rolled off the 1 week window.
totalTestResults := totalRuns * len(f.TestNames)

log.Info("Refreshing materialized views...")
sippyserver.RefreshData(dbc, nil, false)
sippyserver.RefreshData(dbc, cacheClient, false)

log.Infof("Successfully seeded test data! Created %d ProwJobs, %d Tests, %d ProwJobRuns, and %d test results",
totalProwJobs, len(f.TestNames), totalRuns, totalTestResults)
Expand Down
80 changes: 80 additions & 0 deletions pkg/api/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func CacheSet[T any](ctx context.Context, c cache.Cache, result T, cacheKey []by
func CalculateRoundedCacheDuration(cacheOptions cache.RequestOptions) time.Duration {
// require cacheDuration for persistence logic
cacheDuration := defaultCacheDuration
if cacheOptions.Expiry > 0 {
cacheDuration = cacheOptions.Expiry
}
if cacheOptions.CRTimeRoundingFactor > 0 {
now := time.Now().UTC()
// Only cache until the next rounding duration
Expand All @@ -158,3 +161,80 @@ func isStructWithNoPublicFields(v interface{}) bool {
}
return true
}

// GetDataFromCacheOrMatview caches data that is based on a matview and invalidates it when the matview is refreshed
func GetDataFromCacheOrMatview[T any](ctx context.Context,
cacheClient cache.Cache, cacheSpec CacheSpec,
matview string,
cacheDuration time.Duration,
generateFn func(context.Context) (T, []error),
defaultVal T,
) (T, []error) {
if cacheClient == nil {
return generateFn(ctx)
}

cacheKey, err := cacheSpec.GetCacheKey()
if err != nil {
return defaultVal, []error{err}
}

// If someone gives us an uncacheable cacheKey, panic so it gets detected in testing
if len(cacheKey) == 0 {
panic(fmt.Sprintf("cache key is empty for %s", reflect.TypeOf(defaultVal)))
}
// If someone gives us an uncacheable value, panic so it gets detected in testing
if isStructWithNoPublicFields(defaultVal) {
panic(fmt.Sprintf("cannot cache type %s that exports no fields", reflect.TypeOf(defaultVal)))
}

var cacheVal struct {
Val T // the actual value we want to cache
Timestamp time.Time // the time when it was cached (for comparison to matview refresh time)
}
if cached, err := cacheClient.Get(ctx, string(cacheKey), 0); err == nil {
logrus.WithFields(logrus.Fields{
"key": string(cacheKey),
"type": reflect.TypeOf(defaultVal).String(),
}).Debugf("cache hit")

if err := json.Unmarshal(cached, &cacheVal); err != nil {
logrus.WithError(err).Warnf("failed to unmarshal cached item. cacheKey=%+v", cacheKey)
// fall through to generate the data instead
} else {
// look up when the matview was refreshed to see if the cached value is stale
var lastRefresh time.Time
if lastRefreshBytes, err := cacheClient.Get(ctx, RefreshMatviewKey(matview), 0); err == nil {
if parsed, err := time.Parse(time.RFC3339, string(lastRefreshBytes)); err != nil {
logrus.WithError(err).Warnf("failed to parse matview refresh timestamp %q for %q; cache will not be invalidated", lastRefreshBytes, matview)
} else {
lastRefresh = parsed
}
}

if lastRefresh.Before(cacheVal.Timestamp) {
// not invalidated by a newer refresh, so use it (if we don't know the last refresh, still use it)
return cacheVal.Val, nil
}
logrus.Debugf("matview %q refreshed at %v, will not use earlier cache entry from %v", matview, lastRefresh, cacheVal.Timestamp)
}
} else if strings.Contains(err.Error(), "connection refused") {
logrus.WithError(err).Fatalf("redis URL specified but got connection refused; exiting due to cost issues in this configuration")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fatal in a library is brutal 😁

could be a networking blip but I guess we can robustify if we run into this too often

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i do not know the history of why it was done this way and it did not seem likely to be a common problem that a blip would kill the server (but maybe it's been happening all the time and we just didn't notice because they're deployed redundantly)

} else {
logrus.WithFields(logrus.Fields{"key": string(cacheKey)}).Debugf("cache miss")
}

// Cache missed or refresh invalidated the data, so generate it.
logrus.Debugf("cache duration set to %s or approx %s for key %s", cacheDuration, time.Now().Add(cacheDuration).Format(time.RFC3339), cacheKey)
result, errs := generateFn(ctx)
if len(errs) == 0 {
cacheVal.Val = result
cacheVal.Timestamp = time.Now().UTC()
CacheSet(ctx, cacheClient, cacheVal, cacheKey, cacheDuration)
}
return result, errs
}

func RefreshMatviewKey(matview string) string {
return "matview_refreshed:" + matview
}
Loading