Skip to content
Merged
54 changes: 53 additions & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,56 @@ jobs:
run: go generate ./...

- name: Test
run: go test ./...
run: go test $(go list ./... | grep -v bench_tests)

benchmark:
runs-on: blacksmith-4vcpu-ubuntu-2404

steps:
- uses: actions/checkout@v6

- name: Get latest Kubo version
id: kubo
run: echo "version=$(gh release list --repo ipfs/kubo --limit 1 | awk '{print $1}')" >> $GITHUB_OUTPUT
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}

- name: Download Kubo
run: wget https://github.com/ipfs/kubo/releases/download/${{ steps.kubo.outputs.version }}/kubo_${{ steps.kubo.outputs.version }}_linux-amd64.tar.gz

- name: Extract and install Kubo
run: |
tar -xzf kubo_${{ steps.kubo.outputs.version }}_linux-amd64.tar.gz
sudo cp kubo/ipfs /usr/local/bin/
rm -rf kubo kubo_${{ steps.kubo.outputs.version }}_linux-amd64.tar.gz
ipfs init

- name: Add /usr/local/bin to PATH
run: echo "/usr/local/bin" >> $GITHUB_PATH

- name: Setup Go
uses: actions/setup-go@v6
with:
go-version-file: go.mod
cache: true

- name: Download go modules
run: go mod download

- name: Fix go mod cache permissions
run: chmod -R 777 $(go list -mod=mod -m -f '{{.Dir}}' go.lumeweb.com/ipfs-content)/internal/testing/fixtures

- name: Generate
run: go generate ./...

- name: Run Benchmarks
run: |
mkdir -p benchmark-results
TUS_BENCH_ENABLE_PROFILING=true TUS_BENCH_PROFILES_DIR=benchmark-results go test -bench=. -benchmem -run=^$ ./internal/protocol/bench_tests/

- name: Upload Benchmark Results
uses: actions/upload-artifact@v4
with:
name: benchmark-results
path: benchmark-results/
retention-days: 30
204 changes: 204 additions & 0 deletions internal/protocol/bench_tests/op_tus_upload_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package bench_tests

import (
"fmt"
"math/rand"
"os"
"path/filepath"
"runtime"
"testing"
"time"

"runtime/pprof"

"github.com/docker/go-units"

pluginCore "go.lumeweb.com/portal-plugin-ipfs/core"
"go.lumeweb.com/portal-plugin-ipfs/internal"
"go.lumeweb.com/portal-plugin-ipfs/internal/db"
tusTestUtils "go.lumeweb.com/portal-plugin-ipfs/internal/testing/tus"
"go.lumeweb.com/portal-plugin-ipfs/internal/testing/util"
"go.lumeweb.com/portal/core"
coreTesting "go.lumeweb.com/portal/core/testing"
"go.lumeweb.com/queryutil"
"go.lumeweb.com/queryutil/filter"
)

func createLargeTestFile(tb coreTesting.TB, size int64) *os.File {
tb.Helper()

tempDir := tb.TempDir()
filePath := filepath.Join(tempDir, fmt.Sprintf("test-file-%d.dat", size))

file, err := os.Create(filePath)
if err != nil {
tb.Fatalf("Failed to create test file: %v", err)
}

chunkSize := 1024 * 1024
chunk := make([]byte, chunkSize)
bytesWritten := int64(0)

for bytesWritten < size {
remaining := size - bytesWritten
if remaining < int64(chunkSize) {
chunk = chunk[:remaining]
}

_, err := rand.Read(chunk)
if err != nil {
file.Close()
tb.Fatalf("Failed to generate random data: %v", err)
}

_, err = file.Write(chunk)
if err != nil {
file.Close()
tb.Fatalf("Failed to write to test file: %v", err)
}

bytesWritten += int64(len(chunk))
}

_, err = file.Seek(0, 0)
if err != nil {
file.Close()
tb.Fatalf("Failed to seek to beginning of test file: %v", err)
}

return file
}

func benchmarkLargeUpload(tb coreTesting.TB, ctx coreTesting.TestContext, size int64) {
testFile := createLargeTestFile(tb, size)
defer testFile.Close()

_, requestID := tusTestUtils.SetupTUSUpload(tb, ctx, testFile, nil)

wfTest := coreTesting.NewWorkflowTest(ctx)
wf := wfTest.NewOperationWorkflow(core.TUSUploadOperationName(internal.ProtocolName))

workflowOptions := []core.WorkflowOption{
core.WithWorkflowUserID(1),
Comment thread
pcfreak30 marked this conversation as resolved.
Outdated
core.WithWorkflowSourceIP("127.0.0.1"),
}

req := wfTest.GetRequest(requestID)
uploadStart := time.Now()

wfTest.MustConvertRequestToWorkflow(requestID, wf, 0, workflowOptions...)
wfTest.ExecuteWorkflowStep(req)
wfTest.CompleteWorkflowStep(req)

uploadDuration := time.Since(uploadStart)
tb.Logf("Upload: %v, Size: %.2f GB", uploadDuration, float64(size)/(1024*1024*1024))

tusTestUtils.AssertTUSWorkflowSuccess(wfTest, req)

pinSvc := core.GetService[pluginCore.IPFSPinService](ctx, pluginCore.PIN_SERVICE)
if pinSvc == nil {
tb.Skip("Pin service not available")
}

startTime := time.Now()
maxPinTimeout := 30 * time.Minute
if timeoutStr := os.Getenv("TUS_BENCH_PIN_TIMEOUT"); timeoutStr != "" {
if duration, err := time.ParseDuration(timeoutStr); err == nil {
maxPinTimeout = duration
}
}

for time.Since(startTime) < maxPinTimeout {
sort := []filter.Sort{{Field: "created_at", Order: filter.OrderDesc}}
pins, _, _ := pinSvc.ListPins(ctx, nil, sort, queryutil.DefaultPagination)

for _, pin := range pins {
if pin.UserID == 1 && pin.Status == db.PinningStatusPinned {
pinDuration := time.Since(startTime)
throughputMBps := float64(size) / (1024 * 1024) / uploadDuration.Seconds()
tb.Logf("Pin wait: %v, Throughput: %.2f MB/s", pinDuration, throughputMBps)
return
}
}
time.Sleep(5 * time.Second)
}
tb.Fatalf("Timeout waiting for pin status")
}

type benchmarkTestCase struct {
name string
size int64
}

var benchmarkCases = []benchmarkTestCase{
{"1GB", 1 * units.GB},
{"10GB", 10 * units.GB},
{"100GB", 100 * units.GB},
{"1TB", units.TB},
}

func BenchmarkTUSUpload(b *testing.B) {
for _, tc := range benchmarkCases {
b.Run(tc.name, func(b *testing.B) {
if testing.Short() && tc.name == "1TB" {
b.Skip("Skipping 1TB benchmark in short mode")
}
b.ResetTimer()
for b.Loop() {
coreTesting.RunTestCaseWithDB(b, func(tb coreTesting.TB, ctx coreTesting.TestContext) {
benchmarkLargeUpload(tb, ctx, tc.size)
}, util.GetStandardTestOptions()...)
}
})
}
}

func BenchmarkTUSUpload_Profile(b *testing.B) {
if testing.Short() {
b.Skip("Skipping performance profiling in short mode")
}
if os.Getenv("TUS_BENCH_ENABLE_PROFILING") != "true" {
b.Skip("Set TUS_BENCH_ENABLE_PROFILING=true to enable profiling")
}

profilesDir := os.Getenv("TUS_BENCH_PROFILES_DIR")
if profilesDir == "" {
profilesDir = b.TempDir()
}

cpuProfileFile := filepath.Join(profilesDir, "cpu.prof")
memProfileFile := filepath.Join(profilesDir, "mem.prof")

f, err := os.Create(cpuProfileFile)
if err != nil {
b.Fatalf("Could not create CPU profile: %v", err)
}
defer f.Close()
if err := pprof.StartCPUProfile(f); err != nil {
b.Fatalf("Could not start CPU profile: %v", err)
}
defer pprof.StopCPUProfile()

for _, tc := range benchmarkCases {
b.Run(tc.name+"-Profile", func(b *testing.B) {
b.ResetTimer()
for b.Loop() {
coreTesting.RunTestCaseWithDB(b, func(tb coreTesting.TB, ctx coreTesting.TestContext) {
benchmarkLargeUpload(tb, ctx, tc.size)
}, util.GetStandardTestOptions()...)
}
})
}

f2, err := os.Create(memProfileFile)
if err != nil {
b.Fatalf("Could not create memory profile: %v", err)
}
defer f2.Close()
runtime.GC()
if err := pprof.WriteHeapProfile(f2); err != nil {
b.Fatalf("Could not write memory profile: %v", err)
}

b.Logf("Profiles saved to %s", profilesDir)
}
10 changes: 2 additions & 8 deletions internal/protocol/tests/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,12 @@ import (
"net/http"
"time"

"go.lumeweb.com/portal-plugin-ipfs/internal/plugin"
"go.lumeweb.com/portal-plugin-ipfs/internal/testing/util"
coreTesting "go.lumeweb.com/portal/core/testing"
serviceTesting "go.lumeweb.com/portal/service/testing"
)

func GetStandardTestOptions() []coreTesting.TestContextBuilderOption {
return []coreTesting.TestContextBuilderOption{
serviceTesting.PresetE2E(),
coreTesting.WithConfig("core.mail.host", "localhost"),
coreTesting.WithConfig("core.mail.port", 25),
coreTesting.WithPlugins(plugin.GetPluginInfoWithTemplates(nil)),
}
return util.GetStandardTestOptions()
}

// HTTPTestClient wraps an HTTP client with helper methods for testing
Expand Down
64 changes: 4 additions & 60 deletions internal/protocol/tests/tests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,23 @@ package tests
import (
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"testing"

"github.com/google/uuid"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
format "github.com/ipfs/go-ipld-format"
"github.com/stretchr/testify/require"
"github.com/tus/tusd/v2/pkg/handler"
pluginCore "go.lumeweb.com/portal-plugin-ipfs/core"
"go.lumeweb.com/portal-plugin-ipfs/internal"
"go.lumeweb.com/portal-plugin-ipfs/internal/protocol"
"go.lumeweb.com/portal-plugin-ipfs/internal/testing/fixtures"
pluginTusUtils "go.lumeweb.com/portal-plugin-ipfs/internal/testing/tus"
pluginUpload "go.lumeweb.com/portal-plugin-ipfs/internal/upload"
"go.lumeweb.com/portal/core"
coreTesting "go.lumeweb.com/portal/core/testing"
Expand Down Expand Up @@ -129,9 +127,7 @@ func assertWorkflowSuccess(wfTest *coreTesting.WorkflowTest, req *models.Request

// assertTUSWorkflowSuccess performs TUS-specific workflow assertions with expected message
func assertTUSWorkflowSuccess(wfTest *coreTesting.WorkflowTest, req *models.Request) {
wfTest.AssertOperationSuccess(req)
wfTest.AssertOperationStatusMessageContains(req, "Successfully completed")
wfTest.AssertOperationStatusProgress(req, 100)
pluginTusUtils.AssertTUSWorkflowSuccess(wfTest, req)
}

// testArchiveUpload is a helper function that tests archive uploads for a given format and mode
Expand Down Expand Up @@ -175,60 +171,8 @@ func testArchiveUpload(t *testing.T, format contentArchive.Format, creator plugi

// setupTUSUpload creates a TUS upload with optional hash and returns protocol and request ID
// hash can be nil for files where hash is not yet known (e.g., non-CAR files)
func setupTUSUpload(t *testing.T, ctx coreTesting.TestContext, uploadFile *os.File, hash core.StorageHash) (core.StorageProtocol, uint) {
tusService := core.GetService[core.TUSService](ctx, core.TUS_SERVICE)
storageSvc := core.GetService[core.StorageService](ctx, core.STORAGE_SERVICE)
proto := core.GetProtocol(internal.ProtocolName)

// TUS Upload Setup
objectId := uuid.New().String()
uploadId := uuid.New().String()
fullId := fmt.Sprintf("%s+%s", objectId, uploadId)

// Create TUS upload - this is TUS-specific logic
testUser, err := core.GetService[core.UserService](ctx, core.USER_SERVICE).CreateAccount(ctx, "test@example.com", "testpassword123", false)
require.NoError(t, err)

tusUpload, err := tusService.CreateUpload(
ctx,
hash,
fullId,
testUser.ID,
"127.0.0.1",
proto.(core.StorageProtocol),
)
require.NoError(t, err)

err = tusService.UploadProcessing(ctx, proto.(core.StorageProtocol), tusUpload.TUSUploadID)
require.NoError(t, err)

// Get file stats for S3 upload
fileSize, err := uploadFile.Stat()
require.NoError(t, err)

// S3 Upload - TUS-specific logic
fileInfo := handler.FileInfo{ID: objectId, Size: fileSize.Size()}
infoData := io.NopCloser(bytes.NewReader(mustMarshal(t, fileInfo)))
err = storageSvc.S3MultipartUpload(
ctx,
infoData,
ctx.Config().Config().Core.Storage.S3.BufferBucket,
storageSvc.GetTemporaryUploadPath(proto.(core.StorageProtocol), fmt.Sprintf("%s.info", objectId)),
uint64(len(mustMarshal(t, fileInfo))),
)
require.NoError(t, err)

// Upload file to S3
err = storageSvc.S3MultipartUpload(
ctx,
uploadFile,
ctx.Config().Config().Core.Storage.S3.BufferBucket,
storageSvc.GetTemporaryUploadPath(proto.(core.StorageProtocol), objectId),
uint64(fileSize.Size()),
)
require.NoError(t, err)

return proto.(core.StorageProtocol), tusUpload.RequestID
func setupTUSUpload(tb coreTesting.TB, ctx coreTesting.TestContext, uploadFile *os.File, hash core.StorageHash) (core.StorageProtocol, uint) {
return pluginTusUtils.SetupTUSUpload(tb, ctx, uploadFile, hash)
}

func testUploadWorkflow(t *testing.T, ctx coreTesting.TestContext, universalReader *pluginUpload.UniversalReader, format contentArchive.Format, mode pluginUpload.ArchiveMode, operationName string, assertionFunc func(*coreTesting.WorkflowTest, *models.Request), workflowDataBuilder func(string) interface{}) {
Expand Down
Loading
Loading