@@ -33,7 +33,6 @@ import (
3333 "os/exec"
3434 "path/filepath"
3535 "runtime"
36- "strconv"
3736 "strings"
3837 "sync"
3938 "time"
@@ -873,18 +872,6 @@ func extractTarZstd(_ context.Context, r io.Reader, dir string) error {
873872 return extractTarPlatform (dec , dir )
874873}
875874
876- // zstdCompressCmd returns the command for zstd compression.
877- // Prefers pzstd (creates parallel frames, decompressable in parallel) and
878- // falls back to zstd -TN -c.
879- func zstdCompressCmd (ctx context.Context ) * exec.Cmd {
880- n := strconv .Itoa (max (1 , runtime .NumCPU ()))
881- if path , err := exec .LookPath ("pzstd" ); err == nil {
882- // -p N = N threads, -c write to stdout
883- return exec .CommandContext (ctx , path , "-p" , n , "-c" ) //nolint:gosec
884- }
885- return exec .CommandContext (ctx , "zstd" , "-T" + n , "-c" ) //nolint:gosec
886- }
887-
888875// cacheExclusions are patterns for files and directories that should never be
889876// included in cache bundles. Patterns with a leading * are suffix-matched;
890877// all others are exact basename matches.
@@ -930,6 +917,11 @@ func isExcludedCache(name string) bool {
930917// Multiple sources map to multiple -C baseDir path entries in the tar command,
931918// which is how we combine caches + configuration-cache + convention build dirs into a single flat
932919// archive.
920+ //
921+ // Compression uses the in-process klauspost/compress/zstd encoder with NumCPU
922+ // goroutines, producing parallel frames that can be decompressed in parallel.
923+ // This eliminates the pzstd/zstd subprocess and the IPC pipe between tar and
924+ // the compressor.
933925func createTarZstd (ctx context.Context , w io.Writer , sources []tarSource ) error {
934926 args := []string {"-chf" , "-" }
935927 for _ , pat := range cacheExclusions {
@@ -940,35 +932,39 @@ func createTarZstd(ctx context.Context, w io.Writer, sources []tarSource) error
940932 args = append (args , "-C" , src .BaseDir , src .Path )
941933 }
942934 tarCmd := exec .CommandContext (ctx , "tar" , args ... ) //nolint:gosec
943- zstdCmd := zstdCompressCmd (ctx )
944935
945936 tarStdout , err := tarCmd .StdoutPipe ()
946937 if err != nil {
947938 return errors .Wrap (err , "tar stdout pipe" )
948939 }
949940
950- var tarStderr , zstdStderr bytes.Buffer
941+ var tarStderr bytes.Buffer
951942 tarCmd .Stderr = & tarStderr
952- zstdCmd .Stdin = tarStdout
953- zstdCmd .Stdout = w
954- zstdCmd .Stderr = & zstdStderr
955943
956944 if err := tarCmd .Start (); err != nil {
957945 return errors .Wrap (err , "start tar" )
958946 }
959- if err := zstdCmd .Start (); err != nil {
960- return errors .Join (errors .Wrap (err , "start zstd" ), tarCmd .Wait ())
947+
948+ enc , err := zstd .NewWriter (w ,
949+ zstd .WithEncoderConcurrency (runtime .NumCPU ()),
950+ zstd .WithWindowSize (zstd .MaxWindowSize ))
951+ if err != nil {
952+ return errors .Join (errors .Wrap (err , "create zstd encoder" ), tarCmd .Wait ())
961953 }
962954
955+ _ , copyErr := io .Copy (enc , tarStdout )
956+ encErr := enc .Close ()
963957 tarErr := tarCmd .Wait ()
964- zstdErr := zstdCmd .Wait ()
965958
966959 var errs []error
967960 if tarErr != nil {
968961 errs = append (errs , errors .Errorf ("tar: %w: %s" , tarErr , tarStderr .String ()))
969962 }
970- if zstdErr != nil {
971- errs = append (errs , errors .Errorf ("zstd: %w: %s" , zstdErr , zstdStderr .String ()))
963+ if copyErr != nil {
964+ errs = append (errs , errors .Wrap (copyErr , "compress stream" ))
965+ }
966+ if encErr != nil {
967+ errs = append (errs , errors .Wrap (encErr , "close zstd encoder" ))
972968 }
973969 return errors .Join (errs ... )
974970}
@@ -1096,33 +1092,20 @@ func touchMarkerFile(path string) error {
10961092// than the system tar command because the file list is already resolved to real paths
10971093// (the caches-dir symlink has been followed by filepath.Walk + EvalSymlinks), so
10981094// symlink dereferencing with -h is not required.
1099- func createDeltaTarZstd (ctx context.Context , w io.Writer , baseDir string , relPaths []string ) error {
1100- zstdCmd := zstdCompressCmd (ctx )
1101-
1102- pr , pw := io .Pipe ()
1103- zstdCmd .Stdin = pr
1104- zstdCmd .Stdout = w
1105- var zstdStderr bytes.Buffer
1106- zstdCmd .Stderr = & zstdStderr
1107-
1108- if err := zstdCmd .Start (); err != nil {
1109- return errors .Wrap (err , "start zstd" )
1095+ //
1096+ // Compression uses the in-process klauspost/compress/zstd encoder.
1097+ func createDeltaTarZstd (_ context.Context , w io.Writer , baseDir string , relPaths []string ) error {
1098+ enc , err := zstd .NewWriter (w ,
1099+ zstd .WithEncoderConcurrency (runtime .NumCPU ()),
1100+ zstd .WithWindowSize (zstd .MaxWindowSize ))
1101+ if err != nil {
1102+ return errors .Wrap (err , "create zstd encoder" )
11101103 }
11111104
1112- // Write the tar stream into the pipe concurrently with zstd compression.
1113- tarErr := writeDeltaTar (pw , baseDir , relPaths )
1114- pw .CloseWithError (tarErr ) //nolint:errcheck,gosec
1105+ tarErr := writeDeltaTar (enc , baseDir , relPaths )
1106+ encErr := enc .Close ()
11151107
1116- zstdErr := zstdCmd .Wait ()
1117-
1118- var errs []error
1119- if tarErr != nil {
1120- errs = append (errs , tarErr )
1121- }
1122- if zstdErr != nil {
1123- errs = append (errs , errors .Errorf ("zstd: %w: %s" , zstdErr , zstdStderr .String ()))
1124- }
1125- return errors .Join (errs ... )
1108+ return errors .Join (tarErr , encErr )
11261109}
11271110
11281111// writeDeltaTar writes a tar stream for the specified files to w.
0 commit comments