Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 26 additions & 39 deletions go/cmd/gitter/gitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,20 @@ func runCmd(ctx context.Context, dir string, env []string, name string, args ...
return nil
}

// runWithSemaphore runs function after waiting at semaphore for concurrency control
func runWithSemaphore(ctx context.Context, f func() (any, error)) (any, error) {
select {
case semaphore <- struct{}{}:
defer func() { <-semaphore }()
logger.DebugContext(ctx, "Concurrent requests", slog.Int("count", len(semaphore)))

return f()
case <-ctx.Done():
logger.WarnContext(ctx, "Request cancelled while waiting for semaphore")
return nil, ctx.Err()
}
}

func isLocalRequest(r *http.Request) bool {
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
Expand Down Expand Up @@ -305,7 +319,9 @@ func marshalResponse(r *http.Request, m proto.Message) ([]byte, error) {

func doFetch(ctx context.Context, w http.ResponseWriter, repoURL string, forceUpdate bool) error {
_, err, _ := gFetch.Do(repoURL, func() (any, error) {
return nil, FetchRepo(ctx, repoURL, forceUpdate)
return runWithSemaphore(ctx, func() (any, error) {
return nil, FetchRepo(ctx, repoURL, forceUpdate)
})
})
if err != nil {
logger.ErrorContext(ctx, "Error fetching blob", slog.Any("error", err))
Expand Down Expand Up @@ -341,11 +357,13 @@ func getFreshRepo(ctx context.Context, w http.ResponseWriter, repoURL string, fo
}

repoAny, err, _ := gLoad.Do(repoPath, func() (any, error) {
repoLock := GetRepoLock(repoURL)
repoLock.RLock()
defer repoLock.RUnlock()
return runWithSemaphore(ctx, func() (any, error) {
repoLock := GetRepoLock(repoURL)
repoLock.RLock()
defer repoLock.RUnlock()

return LoadRepository(ctx, repoPath)
return LoadRepository(ctx, repoPath)
})
})
if err != nil {
logger.ErrorContext(ctx, "Failed to load repository", slog.Any("error", err))
Expand Down Expand Up @@ -568,25 +586,16 @@ func gitHandler(w http.ResponseWriter, r *http.Request) {
ctx := context.WithValue(r.Context(), urlKey, repoURL)
logger.InfoContext(ctx, "Received request: /git", slog.Bool("forceUpdate", forceUpdate), slog.String("remoteAddr", r.RemoteAddr))

select {
case semaphore <- struct{}{}:
defer func() { <-semaphore }()
case <-ctx.Done():
logger.WarnContext(ctx, "Request cancelled while waiting for semaphore")
http.Error(w, "Server context cancelled", http.StatusServiceUnavailable)

return
}
logger.DebugContext(ctx, "Concurrent requests", slog.Int("count", len(semaphore)))

// Fetch repo first
if err := doFetch(ctx, w, repoURL, forceUpdate); err != nil {
return
}

// Archive repo
fileDataAny, err, _ := gArchive.Do(repoURL, func() (any, error) {
return ArchiveRepo(ctx, repoURL)
return runWithSemaphore(ctx, func() (any, error) {
return ArchiveRepo(ctx, repoURL)
})
})
if err != nil {
logger.ErrorContext(ctx, "Error archiving blob", slog.Any("error", err))
Expand Down Expand Up @@ -626,17 +635,6 @@ func cacheHandler(w http.ResponseWriter, r *http.Request) {
ctx := context.WithValue(r.Context(), urlKey, repoURL)
logger.InfoContext(ctx, "Received request: /cache")

select {
case semaphore <- struct{}{}:
defer func() { <-semaphore }()
case <-ctx.Done():
logger.WarnContext(ctx, "Request cancelled while waiting for semaphore")
http.Error(w, "Server context cancelled", http.StatusServiceUnavailable)

return
}
logger.DebugContext(ctx, "Concurrent requests", slog.Int("count", len(semaphore)))

if _, err := getFreshRepo(ctx, w, repoURL, body.GetForceUpdate()); err != nil {
return
}
Expand Down Expand Up @@ -687,17 +685,6 @@ func affectedCommitsHandler(w http.ResponseWriter, r *http.Request) {
slog.Bool("considerAllBranches", considerAllBranches),
)

select {
case semaphore <- struct{}{}:
defer func() { <-semaphore }()
case <-ctx.Done():
logger.WarnContext(ctx, "Request cancelled while waiting for semaphore")
http.Error(w, "Server context cancelled", http.StatusServiceUnavailable)

return
}
logger.DebugContext(ctx, "Concurrent requests", slog.Int("count", len(semaphore)))

repo, err := getFreshRepo(ctx, w, repoURL, body.GetForceUpdate())
if err != nil {
return
Expand Down
15 changes: 12 additions & 3 deletions go/cmd/gitter/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,16 @@ func loadLastFetchMap() {
logger.Info("Loaded lastFetch map", slog.Int("entry_count", len(lastFetch)))
}

func saveRepositoryCache(cachePath string, repo *Repository) error {
func saveRepositoryCache(cachePath string, repo *Repository) (int, error) {
logger.Info("Saving repository cache", slog.String("path", cachePath))

cache := &pb.RepositoryCache{}
emptyPatchID := SHA1{}
for _, commit := range repo.commits {
// Only save commits that have a patch ID
if commit.PatchID == emptyPatchID {
continue
}
cache.Commits = append(cache.Commits, &pb.CommitDetail{
Hash: commit.Hash[:],
PatchId: commit.PatchID[:],
Expand All @@ -99,10 +104,14 @@ func saveRepositoryCache(cachePath string, repo *Repository) error {

data, err := proto.Marshal(cache)
if err != nil {
return err
return 0, err
}

if err := os.WriteFile(cachePath, data, 0600); err != nil {
return 0, err
}

return os.WriteFile(cachePath, data, 0600)
return len(cache.GetCommits()), nil
}

func loadRepositoryCache(cachePath string) (*pb.RepositoryCache, error) {
Expand Down
12 changes: 8 additions & 4 deletions go/cmd/gitter/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,21 @@ func LoadRepository(ctx context.Context, repoPath string) (*Repository, error) {
return nil, fmt.Errorf("failed to build commit graph: %w", err)
}

var patchIDErr error
if len(newCommits) > 0 {
if err := repo.calculatePatchIDs(ctx, newCommits); err != nil {
return nil, fmt.Errorf("failed to calculate patch id for commits: %w", err)
}
patchIDErr = repo.calculatePatchIDs(ctx, newCommits)
}

// Save cache
if err := saveRepositoryCache(cachePath, repo); err != nil {
patchIDCount, err := saveRepositoryCache(cachePath, repo)
if err != nil {
logger.ErrorContext(ctx, "Failed to save repository cache", slog.Any("err", err))
}

if patchIDErr != nil {
return nil, fmt.Errorf("failed to fully calculate patch id for commits: %w, saved %d", patchIDErr, patchIDCount)
}

return repo, nil
}

Expand Down
Loading