diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index df57e113..ec8060f1 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -51,4 +51,47 @@ jobs: run: go generate ./... - name: Test - run: go test ./... + run: go test $(go list ./... | grep -v bench_tests) + + benchmark: + runs-on: blacksmith-4vcpu-ubuntu-2404 + + steps: + - uses: actions/checkout@v6 + + - name: Get latest Kubo version + id: kubo + run: echo "version=$(gh release list --repo ipfs/kubo --limit 1 | awk '{print $1}')" >> $GITHUB_OUTPUT + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Download Kubo + run: wget https://github.com/ipfs/kubo/releases/download/${{ steps.kubo.outputs.version }}/kubo_${{ steps.kubo.outputs.version }}_linux-amd64.tar.gz + + - name: Extract and install Kubo + run: | + tar -xzf kubo_${{ steps.kubo.outputs.version }}_linux-amd64.tar.gz + sudo cp kubo/ipfs /usr/local/bin/ + rm -rf kubo kubo_${{ steps.kubo.outputs.version }}_linux-amd64.tar.gz + ipfs init + + - name: Add /usr/local/bin to PATH + run: echo "/usr/local/bin" >> $GITHUB_PATH + + - name: Setup Go + uses: actions/setup-go@v6 + with: + go-version-file: go.mod + cache: true + + - name: Run Benchmarks + run: | + mkdir -p benchmark-results + TUS_BENCH_ENABLE_PROFILING=true TUS_BENCH_PIN_TIMEOUT=0 TUS_BENCH_PROFILES_DIR=benchmark-results go test -bench=. -benchmem -run=^$ ./internal/protocol/bench_tests/ + + - name: Upload Benchmark Results + uses: actions/upload-artifact@v4 + with: + name: benchmark-results + path: benchmark-results/ + retention-days: 30 diff --git a/internal/protocol/bench_tests/op_tus_upload_bench_test.go b/internal/protocol/bench_tests/op_tus_upload_bench_test.go new file mode 100644 index 00000000..2ede9577 --- /dev/null +++ b/internal/protocol/bench_tests/op_tus_upload_bench_test.go @@ -0,0 +1,215 @@ +package bench_tests + +import ( + "fmt" + "math/rand" + "os" + "path/filepath" + "runtime" + "testing" + "time" + + "runtime/pprof" + + "github.com/docker/go-units" + + pluginCore "go.lumeweb.com/portal-plugin-ipfs/core" + "go.lumeweb.com/portal-plugin-ipfs/internal" + "go.lumeweb.com/portal-plugin-ipfs/internal/db" + "go.lumeweb.com/portal/db/models" + tusTestUtils "go.lumeweb.com/portal-plugin-ipfs/internal/testing/tus" + "go.lumeweb.com/portal-plugin-ipfs/internal/testing/testopts" + "go.lumeweb.com/portal/core" + coreTesting "go.lumeweb.com/portal/core/testing" + "go.lumeweb.com/queryutil" + "go.lumeweb.com/queryutil/filter" +) + +func createLargeTestFile(tb coreTesting.TB, size int64) *os.File { + tb.Helper() + + tempDir := tb.TempDir() + filePath := filepath.Join(tempDir, fmt.Sprintf("test-file-%d.dat", size)) + + file, err := os.Create(filePath) + if err != nil { + tb.Fatalf("Failed to create test file: %v", err) + } + + chunkSize := 1024 * 1024 + chunk := make([]byte, chunkSize) + bytesWritten := int64(0) + + for bytesWritten < size { + remaining := size - bytesWritten + if remaining < int64(chunkSize) { + chunk = chunk[:remaining] + } + + _, err := rand.Read(chunk) + if err != nil { + file.Close() + tb.Fatalf("Failed to generate random data: %v", err) + } + + _, err = file.Write(chunk) + if err != nil { + file.Close() + tb.Fatalf("Failed to write to test file: %v", err) + } + + bytesWritten += int64(len(chunk)) + } + + _, err = file.Seek(0, 0) + if err != nil { + file.Close() + tb.Fatalf("Failed to seek to beginning of test file: %v", err) + } + + return file +} + +func benchmarkLargeUpload(tb coreTesting.TB, ctx coreTesting.TestContext, size int64) { + testFile := createLargeTestFile(tb, size) + defer testFile.Close() + + _, requestID, userID := tusTestUtils.SetupTUSUpload(tb, ctx, testFile, nil) + + wfTest := coreTesting.NewWorkflowTest(ctx) + wf := wfTest.NewOperationWorkflow(core.TUSUploadOperationName(internal.ProtocolName)) + + workflowOptions := []core.WorkflowOption{ + core.WithWorkflowUserID(userID), + core.WithWorkflowSourceIP("127.0.0.1"), + } + + req := wfTest.GetRequest(requestID) + uploadStart := time.Now() + + wfTest.MustConvertRequestToWorkflow(requestID, wf, 0, workflowOptions...) + wfTest.ExecuteWorkflowStep(req) + wfTest.CompleteWorkflowStep(req) + + uploadDuration := time.Since(uploadStart) + tb.Logf("Upload: %v, Size: %.2f GB", uploadDuration, float64(size)/(1024*1024*1024)) + + wfTest.AssertOperationSuccess(req) + + if req.Status != models.RequestStatusCompleted { + tb.Logf("Workflow did not complete successfully, skipping pin status check: %s", req.StatusMessage) + return + } + + pinSvc := core.GetService[pluginCore.IPFSPinService](ctx, pluginCore.PIN_SERVICE) + if pinSvc == nil { + tb.Skip("Pin service not available") + } + + startTime := time.Now() + var maxPinTimeout time.Duration + disableTimeout := false + if timeoutStr := os.Getenv("TUS_BENCH_PIN_TIMEOUT"); timeoutStr != "" { + if timeoutStr == "0" { + disableTimeout = true + } else if duration, err := time.ParseDuration(timeoutStr); err == nil { + maxPinTimeout = duration + } + } else { + maxPinTimeout = 30 * time.Minute + } + + for disableTimeout || time.Since(startTime) < maxPinTimeout { + sort := []filter.Sort{{Field: "created_at", Order: filter.OrderDesc}} + pins, _, _ := pinSvc.ListPins(ctx, nil, sort, queryutil.DefaultPagination) + + for _, pin := range pins { + if pin.UserID == userID && pin.Status == db.PinningStatusPinned { + pinDuration := time.Since(startTime) + throughputMBps := float64(size) / (1024 * 1024) / uploadDuration.Seconds() + tb.Logf("Pin wait: %v, Throughput: %.2f MB/s", pinDuration, throughputMBps) + return + } + } + time.Sleep(5 * time.Second) + } + tb.Fatalf("Timeout waiting for pin status") +} + +type benchmarkTestCase struct { + name string + size int64 +} + +var benchmarkCases = []benchmarkTestCase{ + {"1GB", 1 * units.GB}, + {"10GB", 10 * units.GB}, + {"100GB", 100 * units.GB}, + {"1TB", units.TB}, +} + +func BenchmarkTUSUpload(b *testing.B) { + for _, tc := range benchmarkCases { + b.Run(tc.name, func(b *testing.B) { + if testing.Short() && tc.name == "1TB" { + b.Skip("Skipping 1TB benchmark in short mode") + } + b.ResetTimer() + for b.Loop() { + coreTesting.RunTestCaseWithDB(b, func(tb coreTesting.TB, ctx coreTesting.TestContext) { + benchmarkLargeUpload(tb, ctx, tc.size) + }, testopts.GetStandardTestOptions()...) + } + }) + } +} + +func BenchmarkTUSUpload_Profile(b *testing.B) { + if testing.Short() { + b.Skip("Skipping performance profiling in short mode") + } + if os.Getenv("TUS_BENCH_ENABLE_PROFILING") != "true" { + b.Skip("Set TUS_BENCH_ENABLE_PROFILING=true to enable profiling") + } + + profilesDir := os.Getenv("TUS_BENCH_PROFILES_DIR") + if profilesDir == "" { + profilesDir = b.TempDir() + } + + cpuProfileFile := filepath.Join(profilesDir, "cpu.prof") + memProfileFile := filepath.Join(profilesDir, "mem.prof") + + f, err := os.Create(cpuProfileFile) + if err != nil { + b.Fatalf("Could not create CPU profile: %v", err) + } + defer f.Close() + if err := pprof.StartCPUProfile(f); err != nil { + b.Fatalf("Could not start CPU profile: %v", err) + } + defer pprof.StopCPUProfile() + + for _, tc := range benchmarkCases { + b.Run(tc.name+"-Profile", func(b *testing.B) { + b.ResetTimer() + for b.Loop() { + coreTesting.RunTestCaseWithDB(b, func(tb coreTesting.TB, ctx coreTesting.TestContext) { + benchmarkLargeUpload(tb, ctx, tc.size) + }, testopts.GetStandardTestOptions()...) + } + }) + } + + f2, err := os.Create(memProfileFile) + if err != nil { + b.Fatalf("Could not create memory profile: %v", err) + } + defer f2.Close() + runtime.GC() + if err := pprof.WriteHeapProfile(f2); err != nil { + b.Fatalf("Could not write memory profile: %v", err) + } + + b.Logf("Profiles saved to %s", profilesDir) +} diff --git a/internal/protocol/tests/test_utils.go b/internal/protocol/tests/test_utils.go index 185ba6ee..dd86bf26 100644 --- a/internal/protocol/tests/test_utils.go +++ b/internal/protocol/tests/test_utils.go @@ -7,18 +7,12 @@ import ( "net/http" "time" - "go.lumeweb.com/portal-plugin-ipfs/internal/plugin" + "go.lumeweb.com/portal-plugin-ipfs/internal/testing/testopts" coreTesting "go.lumeweb.com/portal/core/testing" - serviceTesting "go.lumeweb.com/portal/service/testing" ) func GetStandardTestOptions() []coreTesting.TestContextBuilderOption { - return []coreTesting.TestContextBuilderOption{ - serviceTesting.PresetE2E(), - coreTesting.WithConfig("core.mail.host", "localhost"), - coreTesting.WithConfig("core.mail.port", 25), - coreTesting.WithPlugins(plugin.GetPluginInfoWithTemplates(nil)), - } + return testopts.GetStandardTestOptions() } // HTTPTestClient wraps an HTTP client with helper methods for testing diff --git a/internal/protocol/tests/tests_test.go b/internal/protocol/tests/tests_test.go index 5923d837..3d47d919 100644 --- a/internal/protocol/tests/tests_test.go +++ b/internal/protocol/tests/tests_test.go @@ -3,25 +3,23 @@ package tests import ( "bytes" "encoding/json" - "fmt" "io" "os" "path/filepath" "strings" "testing" - "github.com/google/uuid" "github.com/ipfs/boxo/blockstore" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" format "github.com/ipfs/go-ipld-format" "github.com/stretchr/testify/require" - "github.com/tus/tusd/v2/pkg/handler" pluginCore "go.lumeweb.com/portal-plugin-ipfs/core" "go.lumeweb.com/portal-plugin-ipfs/internal" "go.lumeweb.com/portal-plugin-ipfs/internal/protocol" "go.lumeweb.com/portal-plugin-ipfs/internal/testing/fixtures" + pluginTusUtils "go.lumeweb.com/portal-plugin-ipfs/internal/testing/tus" pluginUpload "go.lumeweb.com/portal-plugin-ipfs/internal/upload" "go.lumeweb.com/portal/core" coreTesting "go.lumeweb.com/portal/core/testing" @@ -129,9 +127,7 @@ func assertWorkflowSuccess(wfTest *coreTesting.WorkflowTest, req *models.Request // assertTUSWorkflowSuccess performs TUS-specific workflow assertions with expected message func assertTUSWorkflowSuccess(wfTest *coreTesting.WorkflowTest, req *models.Request) { - wfTest.AssertOperationSuccess(req) - wfTest.AssertOperationStatusMessageContains(req, "Successfully completed") - wfTest.AssertOperationStatusProgress(req, 100) + pluginTusUtils.AssertTUSWorkflowSuccess(wfTest, req) } // testArchiveUpload is a helper function that tests archive uploads for a given format and mode @@ -173,62 +169,10 @@ func testArchiveUpload(t *testing.T, format contentArchive.Format, creator plugi }, finalOptions...) } -// setupTUSUpload creates a TUS upload with optional hash and returns protocol and request ID +// setupTUSUpload creates a TUS upload with optional hash and returns protocol, request ID, and user ID // hash can be nil for files where hash is not yet known (e.g., non-CAR files) -func setupTUSUpload(t *testing.T, ctx coreTesting.TestContext, uploadFile *os.File, hash core.StorageHash) (core.StorageProtocol, uint) { - tusService := core.GetService[core.TUSService](ctx, core.TUS_SERVICE) - storageSvc := core.GetService[core.StorageService](ctx, core.STORAGE_SERVICE) - proto := core.GetProtocol(internal.ProtocolName) - - // TUS Upload Setup - objectId := uuid.New().String() - uploadId := uuid.New().String() - fullId := fmt.Sprintf("%s+%s", objectId, uploadId) - - // Create TUS upload - this is TUS-specific logic - testUser, err := core.GetService[core.UserService](ctx, core.USER_SERVICE).CreateAccount(ctx, "test@example.com", "testpassword123", false) - require.NoError(t, err) - - tusUpload, err := tusService.CreateUpload( - ctx, - hash, - fullId, - testUser.ID, - "127.0.0.1", - proto.(core.StorageProtocol), - ) - require.NoError(t, err) - - err = tusService.UploadProcessing(ctx, proto.(core.StorageProtocol), tusUpload.TUSUploadID) - require.NoError(t, err) - - // Get file stats for S3 upload - fileSize, err := uploadFile.Stat() - require.NoError(t, err) - - // S3 Upload - TUS-specific logic - fileInfo := handler.FileInfo{ID: objectId, Size: fileSize.Size()} - infoData := io.NopCloser(bytes.NewReader(mustMarshal(t, fileInfo))) - err = storageSvc.S3MultipartUpload( - ctx, - infoData, - ctx.Config().Config().Core.Storage.S3.BufferBucket, - storageSvc.GetTemporaryUploadPath(proto.(core.StorageProtocol), fmt.Sprintf("%s.info", objectId)), - uint64(len(mustMarshal(t, fileInfo))), - ) - require.NoError(t, err) - - // Upload file to S3 - err = storageSvc.S3MultipartUpload( - ctx, - uploadFile, - ctx.Config().Config().Core.Storage.S3.BufferBucket, - storageSvc.GetTemporaryUploadPath(proto.(core.StorageProtocol), objectId), - uint64(fileSize.Size()), - ) - require.NoError(t, err) - - return proto.(core.StorageProtocol), tusUpload.RequestID +func setupTUSUpload(tb coreTesting.TB, ctx coreTesting.TestContext, uploadFile *os.File, hash core.StorageHash) (core.StorageProtocol, uint, uint) { + return pluginTusUtils.SetupTUSUpload(tb, ctx, uploadFile, hash) } func testUploadWorkflow(t *testing.T, ctx coreTesting.TestContext, universalReader *pluginUpload.UniversalReader, format contentArchive.Format, mode pluginUpload.ArchiveMode, operationName string, assertionFunc func(*coreTesting.WorkflowTest, *models.Request), workflowDataBuilder func(string) interface{}) { @@ -280,7 +224,7 @@ func getCARRootsFromFile(t *testing.T, file *os.File) cid.Cid { // executeTUSWorkflowHelper is a helper function that executes TUS upload workflow // It handles the common workflow execution pattern for both archives and plain files -func executeTUSWorkflowHelper(t *testing.T, ctx coreTesting.TestContext, tempFile *os.File, format contentArchive.Format, requestID uint) { +func executeTUSWorkflowHelper(t *testing.T, ctx coreTesting.TestContext, tempFile *os.File, format contentArchive.Format, requestID uint, userID uint) { wfTest := coreTesting.NewWorkflowTest(ctx) wf := wfTest.NewOperationWorkflow(core.TUSUploadOperationName(internal.ProtocolName)) @@ -292,7 +236,7 @@ func executeTUSWorkflowHelper(t *testing.T, ctx coreTesting.TestContext, tempFil workflowOptions = append(workflowOptions, core.WithWorkflowStorageHash(internal.NewIPFSHash(getCARRootsFromFile(t, tempFile)))) } workflowOptions = append(workflowOptions, - core.WithWorkflowUserID(1), // User created in setupTUSUpload + core.WithWorkflowUserID(userID), // User created in setupTUSUpload core.WithWorkflowSourceIP("127.0.0.1"), ) @@ -332,10 +276,10 @@ func runTUSFileUploadInternal(t *testing.T, ctx coreTesting.TestContext, fileCon require.NoError(t, err) // For plain files, hash is not known yet (will be computed during upload) - _, requestID := setupTUSUpload(t, ctx, tempFile, nil) + _, requestID, userID := setupTUSUpload(t, ctx, tempFile, nil) // Execute workflow using the helper - executeTUSWorkflowHelper(t, ctx, tempFile, contentArchive.FormatFile, requestID) + executeTUSWorkflowHelper(t, ctx, tempFile, contentArchive.FormatFile, requestID, userID) } // testTUSFileUpload tests plain file uploads (FormatFile) through the TUS upload workflow @@ -365,18 +309,19 @@ func runTUSArchiveUploadInternal(t *testing.T, ctx coreTesting.TestContext, form // Use appropriate setup based on format var requestID uint + var userID uint if format == contentArchive.FormatCAR { // For CAR files, we can pre-compute the hash roots, err := pluginUpload.GetCarRoots(tempFile, false) require.NoError(t, err) - _, requestID = setupTUSUpload(t, ctx, tempFile, internal.NewIPFSHash(roots[0])) + _, requestID, userID = setupTUSUpload(t, ctx, tempFile, internal.NewIPFSHash(roots[0])) } else { // For non-CAR files, hash is not known yet - _, requestID = setupTUSUpload(t, ctx, tempFile, nil) + _, requestID, userID = setupTUSUpload(t, ctx, tempFile, nil) } // Execute workflow using the helper - executeTUSWorkflowHelper(t, ctx, tempFile, format, requestID) + executeTUSWorkflowHelper(t, ctx, tempFile, format, requestID, userID) } // testTUSArchiveUpload is a TUS-specific wrapper for testArchiveUpload diff --git a/internal/testing/testopts/options.go b/internal/testing/testopts/options.go new file mode 100644 index 00000000..11a3b072 --- /dev/null +++ b/internal/testing/testopts/options.go @@ -0,0 +1,16 @@ +package testopts + +import ( + "go.lumeweb.com/portal-plugin-ipfs/internal/plugin" + coreTesting "go.lumeweb.com/portal/core/testing" + serviceTesting "go.lumeweb.com/portal/service/testing" +) + +func GetStandardTestOptions() []coreTesting.TestContextBuilderOption { + return []coreTesting.TestContextBuilderOption{ + serviceTesting.PresetE2E(), + coreTesting.WithConfig("core.mail.host", "localhost"), + coreTesting.WithConfig("core.mail.port", 25), + coreTesting.WithPlugins(plugin.GetPluginInfoWithTemplates(nil)), + } +} diff --git a/internal/testing/tus/helpers.go b/internal/testing/tus/helpers.go new file mode 100644 index 00000000..c5622014 --- /dev/null +++ b/internal/testing/tus/helpers.go @@ -0,0 +1,94 @@ +package tus + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "os" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/tus/tusd/v2/pkg/handler" + "go.lumeweb.com/portal/core" + coreTesting "go.lumeweb.com/portal/core/testing" + "go.lumeweb.com/portal/db/models" + "go.lumeweb.com/portal-plugin-ipfs/internal" +) + +func mustMarshal(tb coreTesting.TB, v interface{}) []byte { + data, err := json.Marshal(v) + require.NoError(tb, err) + return data +} + +// assertTUSWorkflowSuccess performs TUS-specific workflow assertions with expected message +func assertTUSWorkflowSuccess(wfTest *coreTesting.WorkflowTest, req *models.Request) { + wfTest.AssertOperationSuccess(req) + wfTest.AssertOperationStatusMessageContains(req, "Successfully completed") + wfTest.AssertOperationStatusProgress(req, 100) +} + +// SetupTUSUpload creates a TUS upload with optional hash and returns protocol, request ID, and user ID +// hash can be nil for files where hash is not yet known (e.g., non-CAR files) +func SetupTUSUpload(tb coreTesting.TB, ctx coreTesting.TestContext, uploadFile *os.File, hash core.StorageHash) (core.StorageProtocol, uint, uint) { + tb.Helper() + tusService := core.GetService[core.TUSService](ctx, core.TUS_SERVICE) + storageSvc := core.GetService[core.StorageService](ctx, core.STORAGE_SERVICE) + proto := core.GetProtocol(internal.ProtocolName) + + // TUS Upload Setup + objectId := uuid.New().String() + uploadId := uuid.New().String() + fullId := fmt.Sprintf("%s+%s", objectId, uploadId) + + // Create TUS upload + testUser, err := core.GetService[core.UserService](ctx, core.USER_SERVICE).CreateAccount(ctx, "test@example.com", "testpassword123", false) + require.NoError(tb, err) + + tusUpload, err := tusService.CreateUpload( + ctx, + hash, + fullId, + testUser.ID, + "127.0.0.1", + proto.(core.StorageProtocol), + ) + require.NoError(tb, err) + + err = tusService.UploadProcessing(ctx, proto.(core.StorageProtocol), tusUpload.TUSUploadID) + require.NoError(tb, err) + + // Get file stats for S3 upload + fileSize, err := uploadFile.Stat() + require.NoError(tb, err) + + // S3 Upload + fileInfo := handler.FileInfo{ID: objectId, Size: fileSize.Size()} + infoData := io.NopCloser(bytes.NewReader(mustMarshal(tb, fileInfo))) + err = storageSvc.S3MultipartUpload( + ctx, + infoData, + ctx.Config().Config().Core.Storage.S3.BufferBucket, + storageSvc.GetTemporaryUploadPath(proto.(core.StorageProtocol), fmt.Sprintf("%s.info", objectId)), + uint64(len(mustMarshal(tb, fileInfo))), + ) + require.NoError(tb, err) + + // Upload file to S3 + err = storageSvc.S3MultipartUpload( + ctx, + uploadFile, + ctx.Config().Config().Core.Storage.S3.BufferBucket, + storageSvc.GetTemporaryUploadPath(proto.(core.StorageProtocol), objectId), + uint64(fileSize.Size()), + ) + require.NoError(tb, err) + + return proto.(core.StorageProtocol), tusUpload.RequestID, testUser.ID +} + +// AssertTUSWorkflowSuccess is exported version for test packages +func AssertTUSWorkflowSuccess(wfTest *coreTesting.WorkflowTest, req *models.Request) { + assertTUSWorkflowSuccess(wfTest, req) +}