Skip to content

Commit 6ef700c

Browse files
committed
Optimize S3 parallel download pipeline
Tune the HTTP transport and download parameters based on benchmarking against CI hosts (m6id.4xlarge, 16 vCPU): - Configure HTTP transport with connection pooling (MaxIdleConnsPerHost) and 128KB read/write buffers for better connection reuse across parallel range workers. - Default to max(16, NumCPU) download workers with 32MB chunks. This configuration consistently achieved ~245 MB/s on fast hosts vs ~100 MB/s for single-stream downloads (the per-connection S3 cap). - Add 8MB bufio buffer between S3 download pipe and pzstd to decouple network I/O from decompression stalls. - Log full commit hash on cache hit for easier debugging.
1 parent 4fe6ae9 commit 6ef700c

2 files changed

Lines changed: 36 additions & 25 deletions

File tree

cmd/gradle-cache/main.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
//
33
// Base bundles are stored at s3://{bucket}/{commit}/{cache-key}/{bundle-file},
44
// where bundle-file is the cache key with colons replaced by dashes + ".tar.zst".
5-
// This format is compatible with the bundled-cache-manager Ruby script.
65
//
76
// On restore, the tool walks the local git history (counting distinct-author
87
// "blocks") to find the most recent S3 hit, downloads it, and extracts it
@@ -109,7 +108,7 @@ type RestoreCmd struct {
109108
MaxBlocks int `help:"Number of distinct-author commit blocks to search." default:"20"`
110109
GradleUserHome string `help:"Path to GRADLE_USER_HOME." env:"GRADLE_USER_HOME"`
111110
IncludedBuilds []string `help:"Included build directories whose build/ output to restore (relative to project root). Use 'dir/*' to restore build/ for all subdirectories. May be repeated." name:"included-build"`
112-
Branch string `help:"Branch name to also apply a delta bundle for (typically $$BRANCH_NAME). The delta download runs concurrently with base extraction." optional:""`
111+
Branch string `help:"Branch name to also apply a delta bundle for (typically $BRANCH_NAME). The delta download runs concurrently with base extraction." optional:""`
113112
}
114113

115114
func (c *RestoreCmd) AfterApply() error {
@@ -169,7 +168,7 @@ func (c *RestoreCmd) Run(ctx context.Context, metrics metricsClient) error {
169168
slog.Info("no cache bundle found in history")
170169
return nil
171170
}
172-
slog.Info("cache hit", "commit", hitCommit[:min(8, len(hitCommit))], "cache-key", c.CacheKey)
171+
slog.Info("cache hit", "commit", hitCommit, "cache-key", c.CacheKey)
173172

174173
// ── Delta pre-fetch (concurrent with base extraction) ─────────────────────
175174
// If --branch is set, kick off a goroutine that stats + downloads the delta
@@ -346,7 +345,10 @@ type extractRule struct {
346345
// merged rather than replaced.
347346
func extractBundleZstd(ctx context.Context, r io.Reader, rules []extractRule, defaultDir string) error {
348347
zstdCmd := zstdDecompressCmd(ctx)
349-
zstdCmd.Stdin = r
348+
// Buffer between S3 download and pzstd to decouple network I/O from
349+
// decompression. Without this, any momentary pause in pzstd (context
350+
// switch, hard block) stalls the S3 read on the synchronous pipe.
351+
zstdCmd.Stdin = bufio.NewReaderSize(r, 8<<20)
350352

351353
var zstdStderr bytes.Buffer
352354
zstdCmd.Stderr = &zstdStderr
@@ -745,7 +747,7 @@ func setupLogger(level string) {
745747
slog.SetDefault(slog.New(handler))
746748
}
747749

748-
// bundleFilename converts a cache key to its S3 filename, matching the Ruby bundled-cache-manager.
750+
// bundleFilename converts a cache key to its S3 filename.
749751
func bundleFilename(cacheKey string) string {
750752
return strings.ReplaceAll(cacheKey, ":", "-") + ".tar.zst"
751753
}
@@ -843,7 +845,7 @@ func (c *countingBody) Read(p []byte) (int, error) {
843845

844846
func extractTarZstd(ctx context.Context, r io.Reader, dir string) error {
845847
zstdCmd := zstdDecompressCmd(ctx)
846-
zstdCmd.Stdin = r
848+
zstdCmd.Stdin = bufio.NewReaderSize(r, 8<<20)
847849

848850
var zstdStderr bytes.Buffer
849851
zstdCmd.Stderr = &zstdStderr

cmd/gradle-cache/s3.go

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"encoding/xml"
1111
"fmt"
1212
"io"
13+
"log/slog"
1314
"net/http"
1415
"net/url"
1516
"os"
@@ -33,20 +34,34 @@ type awsCreds struct {
3334

3435
// s3Client is a minimal AWS S3 client supporting HeadObject, GetObject, and PutObject.
3536
type s3Client struct {
36-
region string
37-
creds awsCreds
38-
http *http.Client
37+
region string
38+
creds awsCreds
39+
http *http.Client
40+
chunkSize int64
41+
dlWorkers int
3942
}
4043

4144
func newS3Client(region string) (*s3Client, error) {
4245
creds, err := resolveAWSCredentials(region)
4346
if err != nil {
4447
return nil, errors.Wrap(err, "resolve AWS credentials")
4548
}
49+
workers := max(defaultDownloadWorkers, runtime.NumCPU())
50+
51+
transport := &http.Transport{
52+
MaxIdleConnsPerHost: workers,
53+
WriteBufferSize: 128 << 10,
54+
ReadBufferSize: 128 << 10,
55+
}
56+
57+
slog.Debug("s3 client config", "workers", workers, "chunk_mb", defaultDownloadChunkSize>>20)
58+
4659
return &s3Client{
47-
region: region,
48-
creds: creds,
49-
http: &http.Client{},
60+
region: region,
61+
creds: creds,
62+
http: &http.Client{Transport: transport},
63+
chunkSize: defaultDownloadChunkSize,
64+
dlWorkers: workers,
5065
}, nil
5166
}
5267

@@ -71,14 +86,8 @@ func (c *s3Client) stat(ctx context.Context, bucket, key string) (int64, error)
7186
}
7287

7388
const (
74-
// downloadChunkSize is the size of each parallel range request.
75-
// 32 MiB gives ~8 in-flight buffers = 256 MiB peak memory, matching the
76-
// AWS S3 Transfer Manager default.
77-
downloadChunkSize = 32 << 20
78-
// downloadWorkers is the number of concurrent range requests.
79-
// max(8, NumCPU) saturates S3 bandwidth on CI instances where a single
80-
// TCP flow is throttled well below the available network capacity.
81-
downloadWorkers = 8
89+
defaultDownloadChunkSize = 32 << 20
90+
defaultDownloadWorkers = 16
8291
)
8392

8493
// get downloads an object and returns its body as a streaming ReadCloser.
@@ -89,7 +98,7 @@ const (
8998
// The caller must close the returned reader.
9099
func (c *s3Client) get(ctx context.Context, bucket, key string, size int64) (io.ReadCloser, error) {
91100
// Small object or unknown size: single-stream GET.
92-
if size <= downloadChunkSize {
101+
if size <= c.chunkSize {
93102
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.objectURL(bucket, key), nil)
94103
if err != nil {
95104
return nil, err
@@ -121,8 +130,8 @@ func (c *s3Client) get(ctx context.Context, bucket, key string, size int64) (io.
121130
// stays active at full speed. All workers run concurrently, saturating the
122131
// available S3 bandwidth. Peak memory is numWorkers × downloadChunkSize.
123132
func (c *s3Client) parallelGet(ctx context.Context, bucket, key string, size int64, w io.Writer) error {
124-
numChunks := int((size + downloadChunkSize - 1) / downloadChunkSize)
125-
numWorkers := max(downloadWorkers, runtime.NumCPU())
133+
numChunks := int((size + c.chunkSize - 1) / c.chunkSize)
134+
numWorkers := max(c.dlWorkers, runtime.NumCPU())
126135

127136
type chunkResult struct {
128137
data []byte
@@ -149,8 +158,8 @@ func (c *s3Client) parallelGet(ctx context.Context, bucket, key string, size int
149158
go func() {
150159
defer wg.Done()
151160
for seq := range work {
152-
start := int64(seq) * downloadChunkSize
153-
end := min(start+downloadChunkSize-1, size-1)
161+
start := int64(seq) * c.chunkSize
162+
end := min(start+c.chunkSize-1, size-1)
154163

155164
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.objectURL(bucket, key), nil)
156165
if err != nil {

0 commit comments

Comments
 (0)