diff --git a/go/cmd/gitter/gitter.go b/go/cmd/gitter/gitter.go index f1f9e8aa1fc..cfaa74928dc 100644 --- a/go/cmd/gitter/gitter.go +++ b/go/cmd/gitter/gitter.go @@ -204,6 +204,20 @@ func runCmd(ctx context.Context, dir string, env []string, name string, args ... return nil } +// runWithSemaphore runs function after waiting at semaphore for concurrency control +func runWithSemaphore(ctx context.Context, f func() (any, error)) (any, error) { + select { + case semaphore <- struct{}{}: + defer func() { <-semaphore }() + logger.DebugContext(ctx, "Concurrent requests", slog.Int("count", len(semaphore))) + + return f() + case <-ctx.Done(): + logger.WarnContext(ctx, "Request cancelled while waiting for semaphore") + return nil, ctx.Err() + } +} + func isLocalRequest(r *http.Request) bool { host, _, err := net.SplitHostPort(r.RemoteAddr) if err != nil { @@ -305,7 +319,9 @@ func marshalResponse(r *http.Request, m proto.Message) ([]byte, error) { func doFetch(ctx context.Context, w http.ResponseWriter, repoURL string, forceUpdate bool) error { _, err, _ := gFetch.Do(repoURL, func() (any, error) { - return nil, FetchRepo(ctx, repoURL, forceUpdate) + return runWithSemaphore(ctx, func() (any, error) { + return nil, FetchRepo(ctx, repoURL, forceUpdate) + }) }) if err != nil { logger.ErrorContext(ctx, "Error fetching blob", slog.Any("error", err)) @@ -341,11 +357,13 @@ func getFreshRepo(ctx context.Context, w http.ResponseWriter, repoURL string, fo } repoAny, err, _ := gLoad.Do(repoPath, func() (any, error) { - repoLock := GetRepoLock(repoURL) - repoLock.RLock() - defer repoLock.RUnlock() + return runWithSemaphore(ctx, func() (any, error) { + repoLock := GetRepoLock(repoURL) + repoLock.RLock() + defer repoLock.RUnlock() - return LoadRepository(ctx, repoPath) + return LoadRepository(ctx, repoPath) + }) }) if err != nil { logger.ErrorContext(ctx, "Failed to load repository", slog.Any("error", err)) @@ -568,17 +586,6 @@ func gitHandler(w http.ResponseWriter, r *http.Request) { ctx := context.WithValue(r.Context(), urlKey, repoURL) logger.InfoContext(ctx, "Received request: /git", slog.Bool("forceUpdate", forceUpdate), slog.String("remoteAddr", r.RemoteAddr)) - select { - case semaphore <- struct{}{}: - defer func() { <-semaphore }() - case <-ctx.Done(): - logger.WarnContext(ctx, "Request cancelled while waiting for semaphore") - http.Error(w, "Server context cancelled", http.StatusServiceUnavailable) - - return - } - logger.DebugContext(ctx, "Concurrent requests", slog.Int("count", len(semaphore))) - // Fetch repo first if err := doFetch(ctx, w, repoURL, forceUpdate); err != nil { return @@ -586,7 +593,9 @@ func gitHandler(w http.ResponseWriter, r *http.Request) { // Archive repo fileDataAny, err, _ := gArchive.Do(repoURL, func() (any, error) { - return ArchiveRepo(ctx, repoURL) + return runWithSemaphore(ctx, func() (any, error) { + return ArchiveRepo(ctx, repoURL) + }) }) if err != nil { logger.ErrorContext(ctx, "Error archiving blob", slog.Any("error", err)) @@ -626,17 +635,6 @@ func cacheHandler(w http.ResponseWriter, r *http.Request) { ctx := context.WithValue(r.Context(), urlKey, repoURL) logger.InfoContext(ctx, "Received request: /cache") - select { - case semaphore <- struct{}{}: - defer func() { <-semaphore }() - case <-ctx.Done(): - logger.WarnContext(ctx, "Request cancelled while waiting for semaphore") - http.Error(w, "Server context cancelled", http.StatusServiceUnavailable) - - return - } - logger.DebugContext(ctx, "Concurrent requests", slog.Int("count", len(semaphore))) - if _, err := getFreshRepo(ctx, w, repoURL, body.GetForceUpdate()); err != nil { return } @@ -687,17 +685,6 @@ func affectedCommitsHandler(w http.ResponseWriter, r *http.Request) { slog.Bool("considerAllBranches", considerAllBranches), ) - select { - case semaphore <- struct{}{}: - defer func() { <-semaphore }() - case <-ctx.Done(): - logger.WarnContext(ctx, "Request cancelled while waiting for semaphore") - http.Error(w, "Server context cancelled", http.StatusServiceUnavailable) - - return - } - logger.DebugContext(ctx, "Concurrent requests", slog.Int("count", len(semaphore))) - repo, err := getFreshRepo(ctx, w, repoURL, body.GetForceUpdate()) if err != nil { return diff --git a/go/cmd/gitter/persistence.go b/go/cmd/gitter/persistence.go index 67e83d31a69..ac308e09994 100644 --- a/go/cmd/gitter/persistence.go +++ b/go/cmd/gitter/persistence.go @@ -86,11 +86,16 @@ func loadLastFetchMap() { logger.Info("Loaded lastFetch map", slog.Int("entry_count", len(lastFetch))) } -func saveRepositoryCache(cachePath string, repo *Repository) error { +func saveRepositoryCache(cachePath string, repo *Repository) (int, error) { logger.Info("Saving repository cache", slog.String("path", cachePath)) cache := &pb.RepositoryCache{} + emptyPatchID := SHA1{} for _, commit := range repo.commits { + // Only save commits that have a patch ID + if commit.PatchID == emptyPatchID { + continue + } cache.Commits = append(cache.Commits, &pb.CommitDetail{ Hash: commit.Hash[:], PatchId: commit.PatchID[:], @@ -99,10 +104,14 @@ func saveRepositoryCache(cachePath string, repo *Repository) error { data, err := proto.Marshal(cache) if err != nil { - return err + return 0, err + } + + if err := os.WriteFile(cachePath, data, 0600); err != nil { + return 0, err } - return os.WriteFile(cachePath, data, 0600) + return len(cache.GetCommits()), nil } func loadRepositoryCache(cachePath string) (*pb.RepositoryCache, error) { diff --git a/go/cmd/gitter/repository.go b/go/cmd/gitter/repository.go index 150de51c96d..57882e930b5 100644 --- a/go/cmd/gitter/repository.go +++ b/go/cmd/gitter/repository.go @@ -90,17 +90,26 @@ func LoadRepository(ctx context.Context, repoPath string) (*Repository, error) { return nil, fmt.Errorf("failed to build commit graph: %w", err) } + var patchIDErr error if len(newCommits) > 0 { - if err := repo.calculatePatchIDs(ctx, newCommits); err != nil { - return nil, fmt.Errorf("failed to calculate patch id for commits: %w", err) - } + patchIDErr = repo.calculatePatchIDs(ctx, newCommits) + } + + // If error is anything other than context cancel, exit early without saving + if patchIDErr != nil && !errors.Is(ctx.Err(), context.Canceled) { + return nil, fmt.Errorf("failed to calculate patch id for commits: %w", patchIDErr) } // Save cache - if err := saveRepositoryCache(cachePath, repo); err != nil { + patchIDCount, err := saveRepositoryCache(cachePath, repo) + if err != nil { logger.ErrorContext(ctx, "Failed to save repository cache", slog.Any("err", err)) } + if patchIDErr != nil { + return nil, fmt.Errorf("failed to fully calculate patch id for commits: %w, saved %d", patchIDErr, patchIDCount) + } + return repo, nil }